fan out parallel-able operator sub-task beyond total slots number

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

fan out parallel-able operator sub-task beyond total slots number

Chen Qin
Hi there,


I try run large number of subtasks within a task slot using slot sharing group. The usage scenario tried to adress operator that makes a network call with high latency yet less memory or cpu footprint. (sample code below)

From doc provided, slotsharinggroup seems the place to look at. Yet it seems it were not designed to address the scenario above. 

My question is, which is best way to fan out large number of sub tasking parallel within a task?

public void testFanOut() throws Exception{
env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.addSource(
...).setParallelism(1).disableChaining().shuffle().flatMap(new FlatMapFunction<DummyFlinkRecord, Long>() {
@Override
public void flatMap(DummyFlinkRecord dummyFlinkRecord, Collector<Long> collector) throws Exception {
Thread.
sleep(1000); //latency is high, needs to fan out
collector.collect(1l);
}
}).slotSharingGroup(
"flatmap").setParallelism(100).rebalance().filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return true;
}
}).setParallelism(
10).addSink(new SinkFunction<Long>() {
@Override
public void invoke(Long aLong) throws Exception {
System.
out.println(aLong);
}
})
;
env.execute("fan out 100 subtasks for 1s delay mapper");
}
Thanks,
Chen Qin
Reply | Threaded
Open this post in threaded view
|

Re: fan out parallel-able operator sub-task beyond total slots number

Till Rohrmann
Hi Chen,

two subtasks of the same operator can never be executed within the same slot/pipeline. The `slotSharingGroup` allows you to only control which subtasks of different operators can be executed along side in the same slot. It basically allows you to break pipelines into smaller ones. Therefore, you need at least as many slots as the maximum degree of parallelism is in your program (so in your case 1000).

Cheers,
Till

On Sun, Apr 17, 2016 at 6:54 PM, Chen Qin <[hidden email]> wrote:
Hi there,


I try run large number of subtasks within a task slot using slot sharing group. The usage scenario tried to adress operator that makes a network call with high latency yet less memory or cpu footprint. (sample code below)

From doc provided, slotsharinggroup seems the place to look at. Yet it seems it were not designed to address the scenario above. 

My question is, which is best way to fan out large number of sub tasking parallel within a task?

public void testFanOut() throws Exception{
env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.addSource(
...).setParallelism(1).disableChaining().shuffle().flatMap(new FlatMapFunction<DummyFlinkRecord, Long>() {
@Override
public void flatMap(DummyFlinkRecord dummyFlinkRecord, Collector<Long> collector) throws Exception {
Thread.
sleep(1000); //latency is high, needs to fan out
collector.collect(1l);
}
}).slotSharingGroup(
"flatmap").setParallelism(100).rebalance().filter(new FilterFunction<Long>() {
@Override
public boolean filter(Long aLong) throws Exception {
return true;
}
}).setParallelism(
10).addSink(new SinkFunction<Long>() {
@Override
public void invoke(Long aLong) throws Exception {
System.
out.println(aLong);
}
})
;
env.execute("fan out 100 subtasks for 1s delay mapper");
}
Thanks,
Chen Qin