Hi, How can I maintain a local state, for instance a ConcurrentHashMap, across different steps in a streaming chain on a single machine/process?
Static variable? (This doesn’t seem to work well when running locally as it gets shared across multiple instances, a common “pipeline” store would be helpful)
Is it OK to checkpoint such a local state in a single map operation at the beginning of the pipeline, or does it need to be done for every function?
Will multiple groupBy steps using the same key selector output pass data to the same machines? (To preserve data locality) How can I do a fold/reduce operation that only returns its result after a full window has been processed, even when the processing in the window includes streams that have
been distributed and merged from different machines using groupBy? My scenario is as follows I want to build up and partition a large state across different machines by using groupBy on a stream. The processing occurs in a window and some processing needs to be done
on multiple machines so I want to do additional groupBy operators to pass partial results to other machines. Pseudo code: flattenedWindowStream = streamSource.groupBy(myKeySelector). // Initial paritioning map(localStateSaverCheckpointMapper). //Checkpoint that saves local state, just passes through the data window(Count(100)).flatten(); localAndRemoteStream = flattenedWindowStream.split(event -> canBeProcessedLocally(event) ? “local” : “remote” ); remoteStream = localAndRemoteStream.select(“remote”). map(partialProcessing). // Partially process what I can with my local state groupBy(myKeySelector). // Send the partial processing to the machines that own the rest of the data map(process);
globalResult = localAndRemoteStream.select(“local”).map(process).union(remoteStream).broadcast(); // Broadcast all fully processed results to all machines globalResult.fold().addSink(globalWindowOutputSink) // fold/reduce, I want a result based on the full contents of the window Any help would be greatly appreciated! Thanks, William |
Hi! What you can do is have a static "broker", which is essentially a map from subtaskIndex to your concurrent hash map. Then all tasks in a pipeline (who have the same subtaskIndex) will grab the same ConcurrentHashMap. You can grab the subtask index if you have a RichFunction (such as RichMapFunction) and call getRuntimeContext().getIndexOfThisSubtask(). You can use the "org.apache.flink.runtime.iterative.concurrent.Broker" (part of "flink-runtime") for that. We use it to hand over shared control structures in iterations. The key would by the subtaskIndex. Bear in mind that, in order for this to work well with the checkpointing, you will need to have the ConcurrentHashMap "owned" consistently by one of the tasks, while the other tasks retrieve references. Greetings, Stephan On Fri, Jun 5, 2015 at 3:00 PM, William Saar <[hidden email]> wrote:
|
In reply to this post by William Saar
Hey,
As for the second part of your question: If you want to apply transformations such as reduce on windows, you need to create a windowed datastream and apply your groupBy, reduce transformations on this WindowedDataStream before calling .flatten() stream.window(..).flatten() have no effect as the windowed stream is turned back to a simple DataStream by flatten. I suggest to read through the windowing section of the programming guide: I also attached a picture illustrating some windowing basics: I hope this helps :) Let me know if you have further questions. Regards, Gyula William Saar <[hidden email]> ezt írta (időpont: 2015. jún. 5., P, 15:02):
|
Free forum by Nabble | Edit this page |