Hi,
I have the following program configured with parallelism 2. After running this example I see only 2 slots are busy. How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)? port = params.getInt("port"); // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(params.getInt("paral", 2)); env.setMaxParallelism(params.getInt("paral", 2)); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n"); DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...); DataStream<Double> counts1 = null; counts1 = input.keyBy(0).countWindow(windowSize, slideSize) .apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() { ... }); DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize) .apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() { ... }); counts1.writeAsText(params.get("output1")); counts2.writeAsText(params.get("output2")); env.execute("Socket Window WordCount"); —— ./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2 Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 Using address 127.0.0.1:6123 to connect to JobManager. JobManager web interface address http://127.0.0.1:8081 Starting execution of program Printing result to stdout. Use --output to specify output path. Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion. Connected to JobManager at Actor[<a href="akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675" class="">akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675] 01/26/2017 22:08:46 Job execution switched to status RUNNING. 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING 01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED 01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING 01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED 01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING 01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING 01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING Best, Ovidiu
|
Hi Ovidiu, you can control the slot assignment by assigning operators to SlotSharingGroup s.someStream.filter(...).slotSharingGroup("name"); Operators is different groups are scheduled to different slots. By default, all operators are in the same group. Have a look at the docs as well [1] Best, Fabian 2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <[hidden email]>:
|
Thank you, Fabian!
It works, what I did and results, as an example for other users: Total slots occupied are 7 (not sure how to check that Source + Flat Map are in the same slot, assumed slot S1 will be that; also S6 and S7 are different, although I set the same name for slot sharing group). // get input data by connecting to the socket DataStream<String> text = env.socketTextStream("localhost", port, "\n"); DataStream<IN> input = text.flatMap(...).slotSharingGroup("PInput").setParallelism(1); //ONE SLOT S1 DataStream<Double> counts1 = null; counts1 = input.keyBy(0).countWindow(windowSize, slideSize) .apply(new WindowFunction<IN, Double, Tuple, GlobalWindow>() { ... }).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); //ONE SLOT S2 DataStream<Double> counts2 = input.keyBy(2).countWindow(windowSize, slideSize) .apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() { ... }).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); //THREE SLOTS S3, S4, S5 counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S6 counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S7 env.execute("Socket Window WordCount"); Best, Ovidiu
|
Now I see (documentation clear), just a correction:
because I set PInput as slot sharing group for flatMap, source and flatMap are in different slots. Also that means S6 and S7 are the same slot, as expected because they share the same slot group output. Best, Ovidiu
|
Free forum by Nabble | Edit this page |