aggregate does not allow RichAggregateFunction ?

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

aggregate does not allow RichAggregateFunction ?

Vishal Santoshi
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);
Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

Vishal Santoshi
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);
Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

Fabian Hueske-2
Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <[hidden email]>:
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);

Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

Aljoscha Krettek
Hi,

If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting?

Best,
Aljoscha

On 6. Dec 2017, at 14:57, Fabian Hueske <[hidden email]> wrote:

Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <[hidden email]>:
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);


Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

Vishal Santoshi
I understand that. Let me elaborate. The sequence of events is 

1. Round robin dispatch to kafka cluster  ( it is not partitioned on the key which we may ultimately do  and than I will have more questions on how to key y and still keep order, pbly avoid shuffle :) ) . 
2. key by a high cardinality key 
3. Sessionize
4. B'coz of the RR on kafka ( and even if partitioned on the key and a subsequent key by ), the sort order is not retained and the ACC has to hold on to the elements in a List . When the Window is finalized  we sort the in ACC  List  and do pagination, We are looking for paths within a session from . a source to a sink event based. I was hoping to use ROCKS DB state as a final merged list and thus off heap and  use a Count based Trigger to evaluate the ACC and merge the inter Trigger collection to the  master copy rather than keeping all events in the ACC ( I would imagine a very general pattern to use ).

Does that make sense ? 








 

On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting?

Best,
Aljoscha


On 6. Dec 2017, at 14:57, Fabian Hueske <[hidden email]> wrote:

Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <[hidden email]>:
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);



Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

Vishal Santoshi
An additional question is that if the source is key partitioned  ( kafka ) does a keyBy retain the order of  a kafka partirion across a shuffle ? 

On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <[hidden email]> wrote:
I understand that. Let me elaborate. The sequence of events is 

1. Round robin dispatch to kafka cluster  ( it is not partitioned on the key which we may ultimately do  and than I will have more questions on how to key y and still keep order, pbly avoid shuffle :) ) . 
2. key by a high cardinality key 
3. Sessionize
4. B'coz of the RR on kafka ( and even if partitioned on the key and a subsequent key by ), the sort order is not retained and the ACC has to hold on to the elements in a List . When the Window is finalized  we sort the in ACC  List  and do pagination, We are looking for paths within a session from . a source to a sink event based. I was hoping to use ROCKS DB state as a final merged list and thus off heap and  use a Count based Trigger to evaluate the ACC and merge the inter Trigger collection to the  master copy rather than keeping all events in the ACC ( I would imagine a very general pattern to use ).

Does that make sense ? 








 

On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting?

Best,
Aljoscha


On 6. Dec 2017, at 14:57, Fabian Hueske <[hidden email]> wrote:

Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <[hidden email]>:
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);




Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

Fabian Hueske-2
Hi,

the order or records that are sent from one task to another task is preserved (task refers to the parallel instance of an operator).
However, a task that receives records from multiple input tasks, consumes records from its inputs in arbitrary order.

If a job reads from a partitioned Kafka topic and does a keyBy on the partitioning key of the Kafka topic, an operator task that follows the keyBy consumes all records with the same key from exactly one input task (the one reading the Kafka partition for the key).
However, since Flink's and Kafka's partitioning functions are not the same, records from the same partition with different keys can be sent to different tasks.

So:
1) Records from the same partition might not be processed by the same operator (and hence not in order).
2) Records with the same key are processed by the same operator in the same order in which they were read from the partition.

Best,
Fabian

2017-12-09 18:09 GMT+01:00 Vishal Santoshi <[hidden email]>:
An additional question is that if the source is key partitioned  ( kafka ) does a keyBy retain the order of  a kafka partirion across a shuffle ? 

On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <[hidden email]> wrote:
I understand that. Let me elaborate. The sequence of events is 

1. Round robin dispatch to kafka cluster  ( it is not partitioned on the key which we may ultimately do  and than I will have more questions on how to key y and still keep order, pbly avoid shuffle :) ) . 
2. key by a high cardinality key 
3. Sessionize
4. B'coz of the RR on kafka ( and even if partitioned on the key and a subsequent key by ), the sort order is not retained and the ACC has to hold on to the elements in a List . When the Window is finalized  we sort the in ACC  List  and do pagination, We are looking for paths within a session from . a source to a sink event based. I was hoping to use ROCKS DB state as a final merged list and thus off heap and  use a Count based Trigger to evaluate the ACC and merge the inter Trigger collection to the  master copy rather than keeping all events in the ACC ( I would imagine a very general pattern to use ).

Does that make sense ? 








 

On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting?

Best,
Aljoscha


On 6. Dec 2017, at 14:57, Fabian Hueske <[hidden email]> wrote:

Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <[hidden email]>:
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);





Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

Vishal Santoshi
Perfect, f in our use case, the kafka partition  key and the keyBy use the same exact field and thus the order will be preserved.

On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the order or records that are sent from one task to another task is preserved (task refers to the parallel instance of an operator).
However, a task that receives records from multiple input tasks, consumes records from its inputs in arbitrary order.

If a job reads from a partitioned Kafka topic and does a keyBy on the partitioning key of the Kafka topic, an operator task that follows the keyBy consumes all records with the same key from exactly one input task (the one reading the Kafka partition for the key).
However, since Flink's and Kafka's partitioning functions are not the same, records from the same partition with different keys can be sent to different tasks.

So:
1) Records from the same partition might not be processed by the same operator (and hence not in order).
2) Records with the same key are processed by the same operator in the same order in which they were read from the partition.

Best,
Fabian

2017-12-09 18:09 GMT+01:00 Vishal Santoshi <[hidden email]>:
An additional question is that if the source is key partitioned  ( kafka ) does a keyBy retain the order of  a kafka partirion across a shuffle ? 

On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <[hidden email]> wrote:
I understand that. Let me elaborate. The sequence of events is 

1. Round robin dispatch to kafka cluster  ( it is not partitioned on the key which we may ultimately do  and than I will have more questions on how to key y and still keep order, pbly avoid shuffle :) ) . 
2. key by a high cardinality key 
3. Sessionize
4. B'coz of the RR on kafka ( and even if partitioned on the key and a subsequent key by ), the sort order is not retained and the ACC has to hold on to the elements in a List . When the Window is finalized  we sort the in ACC  List  and do pagination, We are looking for paths within a session from . a source to a sink event based. I was hoping to use ROCKS DB state as a final merged list and thus off heap and  use a Count based Trigger to evaluate the ACC and merge the inter Trigger collection to the  master copy rather than keeping all events in the ACC ( I would imagine a very general pattern to use ).

Does that make sense ? 








 

On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting?

Best,
Aljoscha


On 6. Dec 2017, at 14:57, Fabian Hueske <[hidden email]> wrote:

Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <[hidden email]>:
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);






Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

Vishal Santoshi
Hello Fabian,
                      We decided that it does not make sense to create partitioned kakka partitions b'coz of hot spot considerations. So we created a way to keep trimmed state in the Accumulator provided  we know the current  watermark to keep the trimmed state time correct.  In essence the paths we look for in a sequence of events in a session are eagerly materialized and emitted using a periodic CountTrigger followed by truncation of the state. 

                 It requires us to know current watermark in the e Accumulator ?  We do have the watermark in Trigger's onElement(), onEventTime()  and onProcessingTime() through the TriggerContext , but I see no way to pass it on to the Accumulator. A lazy setting of WM on the element, which we thought was a shared instance between invocation of add() on Accumulator and onElement() on the attached Trigger, does not seem to work in a distributed environment. 

I tried the ProcessWindowFunction too. It was promising as it's process method has the Context and thus the WM, but it too suffers from the same issue when using WindowState ( state keyed to window and key ) in session window throwing 
java.lang.UnsupportedOperationException: Per-window state is not allowed when using merging windows.
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$MergingWindowStateStore.getState(WindowOperator.java:720)

Vishal

                    

On Mon, Dec 11, 2017 at 10:13 AM, Vishal Santoshi <[hidden email]> wrote:
Perfect, f in our use case, the kafka partition  key and the keyBy use the same exact field and thus the order will be preserved.

On Mon, Dec 11, 2017 at 4:34 AM, Fabian Hueske <[hidden email]> wrote:
Hi,

the order or records that are sent from one task to another task is preserved (task refers to the parallel instance of an operator).
However, a task that receives records from multiple input tasks, consumes records from its inputs in arbitrary order.

If a job reads from a partitioned Kafka topic and does a keyBy on the partitioning key of the Kafka topic, an operator task that follows the keyBy consumes all records with the same key from exactly one input task (the one reading the Kafka partition for the key).
However, since Flink's and Kafka's partitioning functions are not the same, records from the same partition with different keys can be sent to different tasks.

So:
1) Records from the same partition might not be processed by the same operator (and hence not in order).
2) Records with the same key are processed by the same operator in the same order in which they were read from the partition.

Best,
Fabian

2017-12-09 18:09 GMT+01:00 Vishal Santoshi <[hidden email]>:
An additional question is that if the source is key partitioned  ( kafka ) does a keyBy retain the order of  a kafka partirion across a shuffle ? 

On Fri, Dec 8, 2017 at 1:12 PM, Vishal Santoshi <[hidden email]> wrote:
I understand that. Let me elaborate. The sequence of events is 

1. Round robin dispatch to kafka cluster  ( it is not partitioned on the key which we may ultimately do  and than I will have more questions on how to key y and still keep order, pbly avoid shuffle :) ) . 
2. key by a high cardinality key 
3. Sessionize
4. B'coz of the RR on kafka ( and even if partitioned on the key and a subsequent key by ), the sort order is not retained and the ACC has to hold on to the elements in a List . When the Window is finalized  we sort the in ACC  List  and do pagination, We are looking for paths within a session from . a source to a sink event based. I was hoping to use ROCKS DB state as a final merged list and thus off heap and  use a Count based Trigger to evaluate the ACC and merge the inter Trigger collection to the  master copy rather than keeping all events in the ACC ( I would imagine a very general pattern to use ).

Does that make sense ? 








 

On Fri, Dec 8, 2017 at 9:11 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,

If you use an AggregatingFunction in this way (i.e. for a window) the ACC should in fact be kept in the state backend. Did you configure the job to use RocksDB? How are the memory problems manifesting?

Best,
Aljoscha


On 6. Dec 2017, at 14:57, Fabian Hueske <[hidden email]> wrote:

Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a WindowFunction that performs all computations when the window is triggered. This means that aggregations do not happen eagerly and all events for a window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <[hidden email]>:
It seems that this has to do with session windows tbat are mergeable ? I tried the RixhWindow function and that seems to suggest that one cannot use state ? Any ideas folks...

On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <[hidden email]> wrote:
I have a simple Aggregation with one caveat. For some reason I have to keep a large amount of state till the window is GCed. The state is within the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to offload the state  to the states backend ( ROCKSDB), keeping the between checkpoint state in memory ( seems to be an obvious fix). I am not though allowed to have a RichAggregateFunction in the aggregate method of a windowed stream . That begs 2 questions  

1. Why 
2. Is there an alternative for stateful window aggregation where we manage the state. ?

Thanks Vishal


Here is the code ( generics but it works  ) 
SingleOutputStreamOperator<OUT> retVal = input
.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.aggregate(
new AggregateFunction<IN, ACC, OUT>() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {
accumulator.add(value);

}

@Override
public OUT getResult(ACC accumulator) {
return accumulator.getLocalValue();
}

@Override
public ACC merge(ACC a, ACC b) {
a.merge(b);
return a;
}
}, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
@Override
public void apply(KEY s, TimeWindow window, Iterable<OUT> input, Collector<OUT> out) throws Exception {
out.collect(input.iterator().next());
}
}, accType, aggregationResultType, aggregationResultType);







Reply | Threaded
Open this post in threaded view
|

Re: aggregate does not allow RichAggregateFunction ?

chiggi_dev
In reply to this post by Fabian Hueske-2
Hi Fabian,

We came across this issue while working on RichAggregateFunction. Isnt
generic state mergeable, similar to ACC merge?  

What if I need the Flink classLoader in the Aggregate function? Is there
anyway I can do that without RuntimeContext?

Thanks,

Chirag



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/