class MyFoldFunction extends FoldFunction {
private var initialized = false
def fold(accumulator: T, value: O): T = {
if(!initialized){
doInitStuff()
initialized = true
}
doNormalStuff()
}
}
Hi,I have few questions related to Flink streaming. I am on 1.2-SNAPSHOT and what I would like to accomplish is to have a stream that reads data from multiple kafka topics, identifies user sessions, uses an external user user profile to enrich the data, evaluates an script to produce session aggregates and then create updated profiles from session aggregates. I am working with high volume data and user sessions may be long, so using generic window apply might not work. Below is the simplification of the stream.stream = createKafkaStreams(...);env.setParallelism(4);env.setStreamTimeCharacteristic(TimeCharacteristic. EventTime); stream.keyBy(2).window(EventTimeSessionWindows.withGap(Time.minutes(10))) .fold(new SessionData(), new SessionFold(), new ProfilerApply()).print();The questions:1. Initially when I used event time windowing I could not get any of my windows to close. The reason seemed to be that I had 6 partitions in my test kafka setup and only 4 of them generated traffic. If I used parallelism above 4, then no windows were closed. Is this by design or a defect? We use flink-connector-kafka-0.10 because earlier versions did not commit the offsets correctly.2. Rich fold functions are not supported. However I would like execute a piece of custom script in the fold function that requires initialisation part. I would have used the open and close lifecycle methods of rich functions but they are not available now in fold. What would be the preferred way to run some initialisation routines (and closing the gracefully) when using fold?3. Kind of related to above. I would also like to fetch a user profile from external source in the beginning of the session. What would be a best practice for that kind of operation? If I would be using the generic window apply I could fetch in in the beginning of the apply method. I was thinking of introducing a mapper that fetches this profiler periodically and caches it to flink state. However, with this setup I would not be able to tie this to user sessions identified for windows.4. I also may have an additional requirement of writing out each event enriched with current session and profile data. I basically could do this again with generic window function and write out each event with collector when iterating, but would there be a better pattern to use? Maybe sharing state with functions or something.Br,Henri H
Free forum by Nabble | Edit this page |