Implement bunch of transformations applied to same source stream in Apache Flink in parallel and combine result

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Implement bunch of transformations applied to same source stream in Apache Flink in parallel and combine result

Andrey Salnikov
Hi!

Could you please help me - I'm trying to use Apache Flink for machine learning tasks with external ensemble/tree libs like XGBoost, so my workflow will be like this:

  • receive single stream of data which atomic event looks like a simple vector event=(X1, X2, X3...Xn) and it can be imagined as POJO fields so initially we have DataStream<event> source=...
  • a lot of feature extractions code applied to the same event source: feature1 = source.map(X1...Xn) feature2 = source.map(X1...Xn) etc. For simplicity lets DataStream<int> feature(i) = source.map() for all features
  • then I need to create a vector with extracted features (feature1, feature2, ...featureK) for now it will be 40-50 features, but I'm sure it will contain more items in future and easily can contains 100-500 features and more
  • put these extracted features to dataset/table columns by 10 minutes window and run final machine learning task on such 10 minutes data

In simple words I need to apply several quite different map operations to the same single event in stream and then combine result from all map functions in single vector.

So for now I can't figure out how to implement final reduce step and run all feature extraction mapjobs in parallel if possible. I spend several days on flink docs site, youtube videos, googling, reading Flink's sources but it seems I'm really stuck here.

The easy solution here will be to use single map operation and run each feature extraction code sequentially one by one in huge map body, and then return final vector (Feature1...FeatureK) for each input event. But it should be crazy and non optimal.

Another solution for each two pair of features use join since all feature DataStreams has same initial event and same key and only apply some transformation code, but it looks ugly: write 50 joins code with some window. And I think that joins and cogroups developed for joining different streams from different sources and not for such map/reduce operations.

As for me for all map operations here should be a something simple which I'm missing.

Could you please point me how you guys implement such tasks in Flink, and if possible with example of code?

PS: I posted this question to stackoverflow.
PPS: If I will use feature1.union(feature2...featureK) I still need somehow separate and combine features vector before sink, and preserve order of final vectors.

Th
​​
anks,
Andrey
Reply | Threaded
Open this post in threaded view
|

Re: Implement bunch of transformations applied to same source stream in Apache Flink in parallel and combine result

Piotr Nowojski
Hi,

What is the number of events per second that you wish to process? If it’s high enough (~ number of machines * number of cores) you should be just fine, instead of scaling with number of features, scale with number of events. If you have a single data source you still could randomly shuffle events before applying your transformations. 

Another solution might be to:
1. Assign unique eventId and split the original event using flatMap into tuples: <featureId, Xi, eventId>
2. keyBy featureId, eventId (or maybe do random partitioning with shuffle?)
3. perform transformation
4. keyBy eventId, ….
5. Window and reduce

But that would add more overhead compared to processing more events at the same time.

Piotrek

On 11 Oct 2017, at 23:02, Andrey Salnikov <[hidden email]> wrote:

Hi!

Could you please help me - I'm trying to use Apache Flink for machine learning tasks with external ensemble/tree libs like XGBoost, so my workflow will be like this:

  • receive single stream of data which atomic event looks like a simple vector event=(X1, X2, X3...Xn) and it can be imagined as POJO fields so initially we have DataStream<event> source=...
  • a lot of feature extractions code applied to the same event source: feature1 = source.map(X1...Xn) feature2 = source.map(X1...Xn) etc. For simplicity lets DataStream<int> feature(i) = source.map() for all features
  • then I need to create a vector with extracted features (feature1, feature2, ...featureK) for now it will be 40-50 features, but I'm sure it will contain more items in future and easily can contains 100-500 features and more
  • put these extracted features to dataset/table columns by 10 minutes window and run final machine learning task on such 10 minutes data

In simple words I need to apply several quite different map operations to the same single event in stream and then combine result from all map functions in single vector.

So for now I can't figure out how to implement final reduce step and run all feature extraction mapjobs in parallel if possible. I spend several days on flink docs site, youtube videos, googling, reading Flink's sources but it seems I'm really stuck here.

The easy solution here will be to use single map operation and run each feature extraction code sequentially one by one in huge map body, and then return final vector (Feature1...FeatureK) for each input event. But it should be crazy and non optimal.

Another solution for each two pair of features use join since all feature DataStreams has same initial event and same key and only apply some transformation code, but it looks ugly: write 50 joins code with some window. And I think that joins and cogroups developed for joining different streams from different sources and not for such map/reduce operations.

As for me for all map operations here should be a something simple which I'm missing.

Could you please point me how you guys implement such tasks in Flink, and if possible with example of code?

PS: I posted this question to stackoverflow.
PPS: If I will use feature1.union(feature2...featureK) I still need somehow separate and combine features vector before sink, and preserve order of final vectors.

Th
​​
anks,
Andrey