Parallelize an incoming stream into 5 streams with the same data

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallelize an incoming stream into 5 streams with the same data

Vijay Balakrishnan
Hi,
I need to broadcast/parallelize an incoming stream(inputStream) into 5 streams with the same data. Each stream is keyed by different keys to do various grouping operations on the set.

Do I just use inputStream.keyBy(5 diff keys) and then just use the DataStream to perform windowing/grouping operations ?

DataStream<Long> inputStream= ...
DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);
DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);

DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do windowing/grouping operations in this function
DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do windowing/grouping operations in this function

out1Stream.print();
out2Stream.addSink(new Out2Sink());

Will this work ?

Or do I use the keyBy Stream with a broadcast function like this:

BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);
DataSTream out1Stream = keyBy1.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

DataSTream out2Stream = keyBy2.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

Or do I need to use split:

SplitStream<Long> source = inputStream.split(new MyOutputSelector());
source.select("").flatMap(new Key1Function()).addSink(out1Sink);
source.select("").flatMap(new Key2Function()).addSink(out2Sink);


static final class MyOutputSelector implements OutputSelector<Long> {
List<String> outputs = new ArrayList<String>();
public Iterable<String> select(Long value) {
outputs.add("");
return outputs;
}
}
TIA,
Reply | Threaded
Open this post in threaded view
|

Re: Parallelize an incoming stream into 5 streams with the same data

Hequn Cheng
Hi Vijay,

Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in `inputStream`.
While option 2 replicate all data to each task and option 3 split data into smaller groups without duplication.

Best, Hequn

On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I need to broadcast/parallelize an incoming stream(inputStream) into 5 streams with the same data. Each stream is keyed by different keys to do various grouping operations on the set.

Do I just use inputStream.keyBy(5 diff keys) and then just use the DataStream to perform windowing/grouping operations ?

DataStream<Long> inputStream= ...
DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);
DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);

DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do windowing/grouping operations in this function
DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do windowing/grouping operations in this function

out1Stream.print();
out2Stream.addSink(new Out2Sink());

Will this work ?

Or do I use the keyBy Stream with a broadcast function like this:

BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);
DataSTream out1Stream = keyBy1.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

DataSTream out2Stream = keyBy2.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

Or do I need to use split:

SplitStream<Long> source = inputStream.split(new MyOutputSelector());
source.select("").flatMap(new Key1Function()).addSink(out1Sink);
source.select("").flatMap(new Key2Function()).addSink(out2Sink);


static final class MyOutputSelector implements OutputSelector<Long> {
List<String> outputs = new ArrayList<String>();
public Iterable<String> select(Long value) {
outputs.add("");
return outputs;
}
}
TIA,
Reply | Threaded
Open this post in threaded view
|

Re: Parallelize an incoming stream into 5 streams with the same data

Vijay Balakrishnan
Thanks,Hequn.
If I have to do a TumblingWindow operation like:
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
I am not able to do that on the output of keyBy(..) which is a KeyedStream.
I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream 
and then perform group operation on the resultant set to get total count etc.
I am only able to do only 1 of keyBy or timeWindowAll as follows:

.keyBy(d._1,d._2)
.process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
OR
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))

Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in the next step:
.keyBy(d._1,d._2)
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))

TIA,
Vijay


On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <[hidden email]> wrote:
Hi Vijay,

Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in `inputStream`.
While option 2 replicate all data to each task and option 3 split data into smaller groups without duplication.

Best, Hequn

On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I need to broadcast/parallelize an incoming stream(inputStream) into 5 streams with the same data. Each stream is keyed by different keys to do various grouping operations on the set.

Do I just use inputStream.keyBy(5 diff keys) and then just use the DataStream to perform windowing/grouping operations ?

DataStream<Long> inputStream= ...
DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);
DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);

DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do windowing/grouping operations in this function
DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do windowing/grouping operations in this function

out1Stream.print();
out2Stream.addSink(new Out2Sink());

Will this work ?

Or do I use the keyBy Stream with a broadcast function like this:

BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);
DataSTream out1Stream = keyBy1.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

DataSTream out2Stream = keyBy2.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

Or do I need to use split:

SplitStream<Long> source = inputStream.split(new MyOutputSelector());
source.select("").flatMap(new Key1Function()).addSink(out1Sink);
source.select("").flatMap(new Key2Function()).addSink(out2Sink);


static final class MyOutputSelector implements OutputSelector<Long> {
List<String> outputs = new ArrayList<String>();
public Iterable<String> select(Long value) {
outputs.add("");
return outputs;
}
}
TIA,
Reply | Threaded
Open this post in threaded view
|

Re: Parallelize an incoming stream into 5 streams with the same data

Hequn Cheng
Hi Vijay,

> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream and then perform group operation on the resultant set to get total count etc.

From your description, I think you can perform a TumblingEventTimeWindow first, something looks like:
// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);
then, you can perform a windowAll after the TumblingEventTimeWindow to get the final total count.

Best,
Hequn



On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks,Hequn.
If I have to do a TumblingWindow operation like:
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
I am not able to do that on the output of keyBy(..) which is a KeyedStream.
I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream 
and then perform group operation on the resultant set to get total count etc.
I am only able to do only 1 of keyBy or timeWindowAll as follows:

.keyBy(d._1,d._2)
.process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
OR
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))

Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in the next step:
.keyBy(d._1,d._2)
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))

TIA,
Vijay


On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <[hidden email]> wrote:
Hi Vijay,

Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in `inputStream`.
While option 2 replicate all data to each task and option 3 split data into smaller groups without duplication.

Best, Hequn

On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I need to broadcast/parallelize an incoming stream(inputStream) into 5 streams with the same data. Each stream is keyed by different keys to do various grouping operations on the set.

Do I just use inputStream.keyBy(5 diff keys) and then just use the DataStream to perform windowing/grouping operations ?

DataStream<Long> inputStream= ...
DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);
DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);

DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do windowing/grouping operations in this function
DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do windowing/grouping operations in this function

out1Stream.print();
out2Stream.addSink(new Out2Sink());

Will this work ?

Or do I use the keyBy Stream with a broadcast function like this:

BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);
DataSTream out1Stream = keyBy1.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

DataSTream out2Stream = keyBy2.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

Or do I need to use split:

SplitStream<Long> source = inputStream.split(new MyOutputSelector());
source.select("").flatMap(new Key1Function()).addSink(out1Sink);
source.select("").flatMap(new Key2Function()).addSink(out2Sink);


static final class MyOutputSelector implements OutputSelector<Long> {
List<String> outputs = new ArrayList<String>();
public Iterable<String> select(Long value) {
outputs.add("");
return outputs;
}
}
TIA,
Reply | Threaded
Open this post in threaded view
|

Re: Parallelize an incoming stream into 5 streams with the same data

Vijay Balakrishnan
Cool, thanks! Hequn. I will try that approach.

Vijay

On Thu, Nov 1, 2018 at 8:18 PM Hequn Cheng <[hidden email]> wrote:
Hi Vijay,

> I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream and then perform group operation on the resultant set to get total count etc.

From your description, I think you can perform a TumblingEventTimeWindow first, something looks like:
// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);
then, you can perform a windowAll after the TumblingEventTimeWindow to get the final total count.

Best,
Hequn



On Fri, Nov 2, 2018 at 6:20 AM Vijay Balakrishnan <[hidden email]> wrote:
Thanks,Hequn.
If I have to do a TumblingWindow operation like:
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
I am not able to do that on the output of keyBy(..) which is a KeyedStream.
I was hoping to groupBy(key._1,key._2) etc and then do a tumblingWindow operation on the KeyedStream 
and then perform group operation on the resultant set to get total count etc.
I am only able to do only 1 of keyBy or timeWindowAll as follows:

.keyBy(d._1,d._2)
.process(new KeyProcessing(FIVE_SECONDS, "componentOperation"))
OR
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))

Doing this doesn't seem to be too helpful as the keyBy KeyedStream is lost in the next step:
.keyBy(d._1,d._2)
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.of(FIVE, TimeUnit.SECONDS))
.process(new WindowProcessing(FIVE_SECONDS))

TIA,
Vijay


On Thu, Oct 25, 2018 at 6:31 PM Hequn Cheng <[hidden email]> wrote:
Hi Vijay,

Option 1 is the right answer. `keyBy1` and `keyBy2` contain all data in `inputStream`.
While option 2 replicate all data to each task and option 3 split data into smaller groups without duplication.

Best, Hequn

On Fri, Oct 26, 2018 at 7:01 AM Vijay Balakrishnan <[hidden email]> wrote:
Hi,
I need to broadcast/parallelize an incoming stream(inputStream) into 5 streams with the same data. Each stream is keyed by different keys to do various grouping operations on the set.

Do I just use inputStream.keyBy(5 diff keys) and then just use the DataStream to perform windowing/grouping operations ?

DataStream<Long> inputStream= ...
DataStream<Long>  keyBy1 = inputStream.keyBy((d) -> d._1);
DataStream<Long>  keyBy2 = inputStream.keyBy((d) -> d._2);

DataStream<Long> out1Stream = keyBy1.flatMap(new Key1Function());// do windowing/grouping operations in this function
DataStream<Long> out2Stream = keyBy2.flatMap(new Key2Function());// do windowing/grouping operations in this function

out1Stream.print();
out2Stream.addSink(new Out2Sink());

Will this work ?

Or do I use the keyBy Stream with a broadcast function like this:

BroadcastStream<Long> broadCastStream = inputStream.broadcast(..);
DataSTream out1Stream = keyBy1.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

DataSTream out2Stream = keyBy2.connect(broadCastStream)
.process(new KeyedBroadcastProcessFunction...)

Or do I need to use split:

SplitStream<Long> source = inputStream.split(new MyOutputSelector());
source.select("").flatMap(new Key1Function()).addSink(out1Sink);
source.select("").flatMap(new Key2Function()).addSink(out2Sink);


static final class MyOutputSelector implements OutputSelector<Long> {
List<String> outputs = new ArrayList<String>();
public Iterable<String> select(Long value) {
outputs.add("");
return outputs;
}
}
TIA,