Hello,
I have flink streaming job as follows DataStream<String> messageStream = env .addSource(new FlinkKafkaConsumer082( flinkParams.getRequired("topic"), new SimpleStringSchema(), flinkParams.getProperties())).setParallelism(5); DataStream<Tuple7<String, String, String, String, String, String, String>> messageStream2 = messageStream .rebalance() .flatMap(new Operator1()) .setParallelism(10); DataStream<Tuple7<String, String, String, String, String, String, String>> messageStream3 = messageStream2 .rebalance() .filter(new Operator2()) .setParallelism(20); DataStream<Tuple2<String, String>> messageStream4 = messageStream3.<Tuple2<String, String>>project(2, 5).setParallelism(20); DataStream<Tuple3<String, String, String>> messageStream5 = messageStream4 .flatMap(new Operator3()) .setParallelism(20) .groupBy(0); messageStream5 .flatMap(new Operator4()) .setParallelism(20); env.execute(); When I submit the job, the number of task slots that gets used (displayed on the UI) is only 20. Why is that? The total number of tasks listed on the ui is 55. And also why does the filter->project->flatmap get compress into one operator with a parallelism of 20? Can I not set the individual operators (i.e. filter and project) to have an individual parallelism of 20? Thanks you the help! Best, Jerry |
Hey Jerry, On Wed, Oct 21, 2015 at 11:11 PM, Jerry Peng <[hidden email]> wrote: When I submit the job, the number of task slots that gets used Do you mean the number of task slots is 55 (you just wrote tasks)? Each task slot runs a pipeline of parallel sub tasks. In your case the number of used task slots corresponds to the maximum parallelism of the job, which is 20. You can have a look at [1]. There is a figure giving an example. And also why does the This is an optimisation, which drastically reduces the overhead for the data exchange between operators. It skips serialisation and results in a simple chain of local method calls. This is possible, because all operators just forward their data. You can disable it via env.disableOperatorChaining(). Does this help? – Ufuk |
Hi! The bottom of this page also has an illustration of task to task slots. https://ci.apache.org/projects/flink/flink-docs-release-0.9/setup/config.html There are two optimizations involved: (1) Chaining: Here sources, mappers, filters are chained together. This is pretty classic, most systems do something like this to reduce thread communication overhead. You can always decide that you do not want two operators to be in teh same chain by calling "startNewChain()" (2) Slot sharing This is an optimization to not have small tasks occupy entire slots (or JVMs), and make it easier to reason how many slots are needed, which is basically max-parallelism many. One slot can hold by default one of each operators, but not two of the same kind (such as two instances of the same source). You can always say that you do not want to share a slot by calling "startNewResourceGroup()". Hope that helps! Greetings, Stephan On Wed, Oct 21, 2015 at 11:34 PM, Ufuk Celebi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |