parallelism for window operations

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

parallelism for window operations

Ovidiu-Cristian MARCU
Hi,

I have the following program configured with parallelism 2.
After running this example I see only 2 slots are busy.

How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?

port = params.getInt("port");

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(params.getInt("paral", 2));
env.setMaxParallelism(params.getInt("paral", 2));

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");

DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
DataStream<Double> counts1 = null;

counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});

DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});

counts1.writeAsText(params.get("output1"));
counts2.writeAsText(params.get("output2"));

env.execute("Socket Window WordCount");


——

./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
Connected to JobManager at Actor[<a href="akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675" class="">akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675]
01/26/2017 22:08:46 Job execution switched to status RUNNING.
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED 
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING 
01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING 
01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING 
01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 

Best,
Ovidiu
Reply | Threaded
Open this post in threaded view
|

Re: parallelism for window operations

Fabian Hueske-2
Hi Ovidiu,

you can control the slot assignment by assigning operators to SlotSharingGroups.
For example like this:

someStream.filter(...).slotSharingGroup("name");

Operators is different groups are scheduled to different slots. By default, all operators are in the same group.
Have a look at the docs as well [1]

Best, Fabian

2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <[hidden email]>:
Hi,

I have the following program configured with parallelism 2.
After running this example I see only 2 slots are busy.

How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?

port = params.getInt("port");

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(params.getInt("paral", 2));
env.setMaxParallelism(params.getInt("paral", 2));

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");

DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
DataStream<Double> counts1 = null;

counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});

DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});

counts1.writeAsText(params.get("output1"));
counts2.writeAsText(params.get("output2"));

env.execute("Socket Window WordCount");


——

./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
01/26/2017 22:08:46 Job execution switched to status RUNNING.
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED 
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING 
01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING 
01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING 
01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 

Best,
Ovidiu

Reply | Threaded
Open this post in threaded view
|

Re: parallelism for window operations

Ovidiu-Cristian MARCU
Thank you, Fabian!

It works, what I did and results, as an example for other users:
Total slots occupied are 7 (not sure how to check that Source + Flat Map are in the same slot, assumed slot S1 will be that; also S6 and S7 are different, although I set the same name for slot sharing group).

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");

DataStream<IN> input = text.flatMap(...).slotSharingGroup("PInput").setParallelism(1); //ONE SLOT S1
DataStream<Double> counts1 = null;

counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
.apply(new WindowFunction<IN, Double, Tuple, GlobalWindow>() {
...
}).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); //ONE SLOT S2

DataStream<Double> counts2 = input.keyBy(2).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
}).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); //THREE SLOTS S3, S4, S5

counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S6
counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S7

env.execute("Socket Window WordCount");

Best,
Ovidiu

On 27 Jan 2017, at 10:13, Fabian Hueske <[hidden email]> wrote:

Hi Ovidiu,

you can control the slot assignment by assigning operators to SlotSharingGroups.
For example like this:

someStream.filter(...).slotSharingGroup("name");

Operators is different groups are scheduled to different slots. By default, all operators are in the same group.
Have a look at the docs as well [1]

Best, Fabian

2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <[hidden email]>:
Hi,

I have the following program configured with parallelism 2.
After running this example I see only 2 slots are busy.

How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?

port = params.getInt("port");

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(params.getInt("paral", 2));
env.setMaxParallelism(params.getInt("paral", 2));

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");

DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
DataStream<Double> counts1 = null;

counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});

DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});

counts1.writeAsText(params.get("output1"));
counts2.writeAsText(params.get("output2"));

env.execute("Socket Window WordCount");
<Screen Shot 2017-01-26 at 22.21.13.png>


——

./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
01/26/2017 22:08:46 Job execution switched to status RUNNING.
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED 
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING 
01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING 
01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING 
01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 

Best,
Ovidiu


Reply | Threaded
Open this post in threaded view
|

Re: parallelism for window operations

Ovidiu-Cristian MARCU
Now I see (documentation clear), just a correction:

because I set PInput as slot sharing group for flatMap, source and flatMap are in different slots.
Also that means S6 and S7 are the same slot, as expected because they share the same slot group output.

Best,
Ovidiu 
On 27 Jan 2017, at 10:43, Ovidiu-Cristian MARCU <[hidden email]> wrote:

Thank you, Fabian!

It works, what I did and results, as an example for other users:
Total slots occupied are 7 (not sure how to check that Source + Flat Map are in the same slot, assumed slot S1 will be that; also S6 and S7 are different, although I set the same name for slot sharing group).

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");

DataStream<IN> input = text.flatMap(...).slotSharingGroup("PInput").setParallelism(1); //ONE SLOT S1
DataStream<Double> counts1 = null;

counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
.apply(new WindowFunction<IN, Double, Tuple, GlobalWindow>() {
...
}).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); //ONE SLOT S2

DataStream<Double> counts2 = input.keyBy(2).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
}).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); //THREE SLOTS S3, S4, S5

counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S6
counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S7

env.execute("Socket Window WordCount");

<Screen Shot 2017-01-27 at 10.34.41.png>
Best,
Ovidiu

On 27 Jan 2017, at 10:13, Fabian Hueske <[hidden email]> wrote:

Hi Ovidiu,

you can control the slot assignment by assigning operators to SlotSharingGroups.
For example like this:

someStream.filter(...).slotSharingGroup("name");

Operators is different groups are scheduled to different slots. By default, all operators are in the same group.
Have a look at the docs as well [1]

Best, Fabian

2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <[hidden email]>:
Hi,

I have the following program configured with parallelism 2.
After running this example I see only 2 slots are busy.

How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?

port = params.getInt("port");

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(params.getInt("paral", 2));
env.setMaxParallelism(params.getInt("paral", 2));

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");

DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
DataStream<Double> counts1 = null;

counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});

DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});

counts1.writeAsText(params.get("output1"));
counts2.writeAsText(params.get("output2"));

env.execute("Socket Window WordCount");
<Screen Shot 2017-01-26 at 22.21.13.png>


——

./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
01/26/2017 22:08:46 Job execution switched to status RUNNING.
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED 
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING 
01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING 
01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING 
01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 

Best,
Ovidiu