Aljoscha -
I want to use a RichFoldFunction to get the open() hook. I cheat and use this structure instead with a (non-Rich) FoldFunction:
public class InfinitResponseFilterFolder implements FoldFunction<Tuple2<Long, Long>, String> {
private BackingStore backingStore;
@Override
public String fold(InfiniteResponseFilter accumulator, Tuple2<Long, Long> value) throws Exception {
if (backingStore == null) { // running the open() hook if necessary
initializeBackingStore();
}
if (accumulator == null) {
accumulator = new InfiniteResponseFilter(backingStore, value);
}
return accumulator.incrementFilter(value)
}
private void initializeBackingStore() {
// connect to database
backingStore = ...
}
}
Note that what I want to do is connect to a backing store to read the initial state for a fold operation. The particular operation I’m trying to do is a form of an infinite response filter in the form of a triple exponential smoother where the various coefficients for a start state are pre-calculated (and stored in that BackingStore).
Further, I want to checkpoint the entire state (including the coefficients) to both Flink’s checkpointing system as well as the backing store. The former is handled here, the latter is handled with another transform in my graph.
Is there a better approach?
Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
[hidden email]
M:<a href="tel:%2B1%20630%20363%208835"> +1 630 363 8835
> On Apr 13, 2016, at 1:25 AM, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi,
> there are two cases where a FoldFunction can be used in the streaming API: KeyedStream.fold() and WindowedStream.fold()/apply(). In both cases we internally use the partitioned state abstraction of Flink to keep the state. So yes, the accumulator value is consistently maintained and will survive failures.
>
> Right now, the accumulation function of a window cannot be a rich function because the underlying state primitives that the windowing system uses can only take plain functions because supporting rich functions there could have problematic implications. The most obvious one to me seems that users could be trying to keep state in the ReduceFunction of a ReducingState when given the chance to do so, which a RichFunction does.
>
> This is just off the top of my head but I can go into detail if you want.
>
> Cheers,
> Aljoscha
>
> On Wed, 13 Apr 2016 at 00:29 Michael Radford <[hidden email]> wrote:
>>
>> I'm wondering whether the accumulator value maintained by a
>> FoldFunction is automatically checkpointed?
>>
>> In general, but specifically when using the WindowedStream.apply
>> variant that takes a FoldFunction:
>>
>> public <R> DataStream<R> apply(R initialValue,
>> FoldFunction<T,R> foldFunction,
>> WindowFunction<R,R,K,W> function,
>> TypeInformation<R> evidence$7)
>>
>> If not, then Flink 1.0.1 still has the issue that you can't pass a
>> RichFoldFunction to WindowedStream.apply
>> (java.lang.UnsupportedOperationException: ReduceFunction of apply can
>> not be a RichFunction).
>>
>> But also, if not, it seems like this would be a common pattern when
>> doing complex (keyed / multi-valued) aggregations, and if the
>> accumulator type R is serializable, there could be a convenience
>> method for a checkpointed fold, like the mapWithState mentioned in the
>> State section of the streaming guide.
>>
>> Thanks,
>> Mike
Free forum by Nabble | Edit this page |