Working with data locality in streaming using groupBy?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Working with data locality in streaming using groupBy?

William Saar

Hi,

How can I maintain a local state, for instance a ConcurrentHashMap, across different steps in a streaming chain on a single machine/process?

Static variable? (This doesn’t seem to work well when running locally as it gets shared across multiple instances, a common “pipeline” store would be helpful)

 

Is it OK to checkpoint such a local state in a single map operation at the beginning of the pipeline, or does it need to be done for every function?

 

Will multiple groupBy steps using the same key selector output pass data to the same machines? (To preserve data locality)

 

How can I do a fold/reduce operation that only returns its result after a full window has been processed, even when the processing in the window includes streams that have been distributed and merged from different machines using groupBy?

 

My scenario is as follows

I want to build up and partition a large state across different machines by using groupBy on a stream. The processing occurs in a window and some processing needs to be done on multiple machines so I want to do additional groupBy operators to pass partial results to other machines. Pseudo code:

 

flattenedWindowStream = streamSource.groupBy(myKeySelector). // Initial paritioning

map(localStateSaverCheckpointMapper).  //Checkpoint that saves local state, just passes through the data

window(Count(100)).flatten();

 

localAndRemoteStream =  flattenedWindowStream.split(event -> canBeProcessedLocally(event) ? “local” : “remote” );

remoteStream = localAndRemoteStream.select(“remote”).

map(partialProcessing).  // Partially process what I can with my local state

groupBy(myKeySelector). // Send the partial processing to the machines that own the rest of the data

map(process);

globalResult = localAndRemoteStream.select(“local”).map(process).union(remoteStream).broadcast(); // Broadcast all fully processed results to all machines

 

globalResult.fold().addSink(globalWindowOutputSink)  // fold/reduce, I want a result based on the full contents of the window

 

Any help would be greatly appreciated!

 

Thanks,

William

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Working with data locality in streaming using groupBy?

Stephan Ewen
Hi!

What you can do is have a static "broker", which is essentially a map from subtaskIndex to your concurrent hash map. Then all tasks in a pipeline (who have the same subtaskIndex) will grab the same ConcurrentHashMap.

You can grab the subtask index if you have a RichFunction (such as RichMapFunction) and call getRuntimeContext().getIndexOfThisSubtask().

You can use the "org.apache.flink.runtime.iterative.concurrent.Broker" (part of "flink-runtime") for that. We use it to hand over shared control structures in iterations. The key would by the subtaskIndex.

Bear in mind that, in order for this to work well with the checkpointing, you will need to have the ConcurrentHashMap "owned" consistently by one of the tasks, while the other tasks retrieve references.

Greetings,
Stephan




On Fri, Jun 5, 2015 at 3:00 PM, William Saar <[hidden email]> wrote:

Hi,

How can I maintain a local state, for instance a ConcurrentHashMap, across different steps in a streaming chain on a single machine/process?

Static variable? (This doesn’t seem to work well when running locally as it gets shared across multiple instances, a common “pipeline” store would be helpful)

 

Is it OK to checkpoint such a local state in a single map operation at the beginning of the pipeline, or does it need to be done for every function?

 

Will multiple groupBy steps using the same key selector output pass data to the same machines? (To preserve data locality)

 

How can I do a fold/reduce operation that only returns its result after a full window has been processed, even when the processing in the window includes streams that have been distributed and merged from different machines using groupBy?

 

My scenario is as follows

I want to build up and partition a large state across different machines by using groupBy on a stream. The processing occurs in a window and some processing needs to be done on multiple machines so I want to do additional groupBy operators to pass partial results to other machines. Pseudo code:

 

flattenedWindowStream = streamSource.groupBy(myKeySelector). // Initial paritioning

map(localStateSaverCheckpointMapper).  //Checkpoint that saves local state, just passes through the data

window(Count(100)).flatten();

 

localAndRemoteStream =  flattenedWindowStream.split(event -> canBeProcessedLocally(event) ? “local” : “remote” );

remoteStream = localAndRemoteStream.select(“remote”).

map(partialProcessing).  // Partially process what I can with my local state

groupBy(myKeySelector). // Send the partial processing to the machines that own the rest of the data

map(process);

globalResult = localAndRemoteStream.select(“local”).map(process).union(remoteStream).broadcast(); // Broadcast all fully processed results to all machines

 

globalResult.fold().addSink(globalWindowOutputSink)  // fold/reduce, I want a result based on the full contents of the window

 

Any help would be greatly appreciated!

 

Thanks,

William

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Working with data locality in streaming using groupBy?

Gyula Fóra
In reply to this post by William Saar
Hey,

As for the second part of your question:

If you want to apply transformations such as reduce on windows, you need to create a windowed datastream and apply your groupBy, reduce transformations on this WindowedDataStream before calling .flatten()

stream.window(..).flatten() have no effect as the windowed stream is turned back to a simple DataStream by flatten.

I suggest to read through the windowing section of the programming guide:

I also attached a picture illustrating some windowing basics:


I hope this helps :) Let me know if you have further questions.

Regards,
Gyula

William Saar <[hidden email]> ezt írta (időpont: 2015. jún. 5., P, 15:02):

Hi,

How can I maintain a local state, for instance a ConcurrentHashMap, across different steps in a streaming chain on a single machine/process?

Static variable? (This doesn’t seem to work well when running locally as it gets shared across multiple instances, a common “pipeline” store would be helpful)

 

Is it OK to checkpoint such a local state in a single map operation at the beginning of the pipeline, or does it need to be done for every function?

 

Will multiple groupBy steps using the same key selector output pass data to the same machines? (To preserve data locality)

 

How can I do a fold/reduce operation that only returns its result after a full window has been processed, even when the processing in the window includes streams that have been distributed and merged from different machines using groupBy?

 

My scenario is as follows

I want to build up and partition a large state across different machines by using groupBy on a stream. The processing occurs in a window and some processing needs to be done on multiple machines so I want to do additional groupBy operators to pass partial results to other machines. Pseudo code:

 

flattenedWindowStream = streamSource.groupBy(myKeySelector). // Initial paritioning

map(localStateSaverCheckpointMapper).  //Checkpoint that saves local state, just passes through the data

window(Count(100)).flatten();

 

localAndRemoteStream =  flattenedWindowStream.split(event -> canBeProcessedLocally(event) ? “local” : “remote” );

remoteStream = localAndRemoteStream.select(“remote”).

map(partialProcessing).  // Partially process what I can with my local state

groupBy(myKeySelector). // Send the partial processing to the machines that own the rest of the data

map(process);

globalResult = localAndRemoteStream.select(“local”).map(process).union(remoteStream).broadcast(); // Broadcast all fully processed results to all machines

 

globalResult.fold().addSink(globalWindowOutputSink)  // fold/reduce, I want a result based on the full contents of the window

 

Any help would be greatly appreciated!

 

Thanks,

William