Dear flink users and developers,
I am trying to test scaling a flink streaming application on a single node and here I summarize my configuration and preliminary result. It would be really helpful if you take some time and consult my settings. test application: flink-1.0.0/examples/streaming/WordCount.jar input file: enwiki-20160305-pages-articles-multistream-index.txt (747,757,155 bytes) (https://dumps.wikimedia.org/enwiki/20160305/enwiki-20160305-pages-articles-multistream-index.txt.bz2) Running environment is as follows: cpu: (4 * AMD Opteron 6378 (16 cores per each)) 2.4GHz memory: 120 GB os: CentOS 7.2 vm: Java 8u74 flink: flink-1.0.0 My flink configuration is here (modified ones): jobmanager.heap.mb: 1024 taskmanager.heap.mb: 2048 taskmanager.numberOfTaskSlots: 64 taskmanager.network.numberOfBuffers: 8192 Keeping the configuration above the same all the way through my test, I only changed parallelism.default for each of the test cases. parallelism.default: 1 parallelism.default: 2 parallelism.default: 4 parallelism.default: 16 parallelism.default: 32 parallelism.default: 64 And I put the result at the end. It seems to scale well until the case of parallelism 8 and usually, ``Source -> Flat Map'' scales better than ``Keyed Aggregation -> Sink''. The result from parallelized subtasks of ``Keyed Aggregation -> Sink'' seem more consistent than the subtasks of ``Source -> Flat Map''. Do you see anything that I might need to / have to fix in the flink configuration or jvm configuration(I did not touch this for this experiment) to improve the performance? Especially the result I got with parallelism of 64 does not look good to me. I would also really appreciate if there is something you want to suggest that might be worth trying. Thank you. With best regards, Shinhyung Yang #============================================================================== # parallelism.default: 1 #============================================================================== Source: Read Text File Source -> Flat Map(1/1): 34m 57s Keyed Aggregation -> Sink: Unnamed(1/1): 47m 05s #============================================================================== # parallelism.default: 2 #============================================================================== Source: Read Text File Source -> Flat Map(1/2): 25m 56s Source: Read Text File Source -> Flat Map(2/2): 26m 27s Keyed Aggregation -> Sink: Unnamed(1/2): 34m 09s Keyed Aggregation -> Sink: Unnamed(2/2): 34m 08s #============================================================================== # parallelism.default: 4 #============================================================================== Source: Read Text File Source -> Flat Map(2/4): 21m 39s Source: Read Text File Source -> Flat Map(1/4): 21m 39s Source: Read Text File Source -> Flat Map(4/4): 22m 30s Source: Read Text File Source -> Flat Map(3/4): 22m 31s Keyed Aggregation -> Sink: Unnamed(3/4): 23m 20s Keyed Aggregation -> Sink: Unnamed(1/4): 24m 13s Keyed Aggregation -> Sink: Unnamed(4/4): 24m 57s Keyed Aggregation -> Sink: Unnamed(2/4): 25m 35s #============================================================================== # parallelism.default: 8 #============================================================================== Source: Read Text File Source -> Flat Map(1/8): 21m 06s Source: Read Text File Source -> Flat Map(7/8): 21m 32s Source: Read Text File Source -> Flat Map(2/8): 21m 45s Source: Read Text File Source -> Flat Map(6/8): 21m 45s Source: Read Text File Source -> Flat Map(4/8): 21m 45s Source: Read Text File Source -> Flat Map(3/8): 21m 45s Source: Read Text File Source -> Flat Map(8/8): 21m 55s Source: Read Text File Source -> Flat Map(5/8): 21m 55s Keyed Aggregation -> Sink: Unnamed(4/8): 22m 08s Keyed Aggregation -> Sink: Unnamed(3/8): 22m 28s Keyed Aggregation -> Sink: Unnamed(7/8): 22m 58s Keyed Aggregation -> Sink: Unnamed(6/8): 22m 59s Keyed Aggregation -> Sink: Unnamed(5/8): 22m 59s Keyed Aggregation -> Sink: Unnamed(2/8): 23m 00s Keyed Aggregation -> Sink: Unnamed(1/8): 23m 47s Keyed Aggregation -> Sink: Unnamed(8/8): 23m 51s #============================================================================== # parallelism.default: 16 #============================================================================== Source; Read Text File Source -> Flat Map(14/16); 08m 57s Source; Read Text File Source -> Flat Map(5/16); 13m 00s Source; Read Text File Source -> Flat Map(10/16); 16m 49s Source; Read Text File Source -> Flat Map(15/16); 16m 49s Source; Read Text File Source -> Flat Map(6/16); 17m 54s Source; Read Text File Source -> Flat Map(2/16); 18m 40s Source; Read Text File Source -> Flat Map(4/16); 19m 48s Source; Read Text File Source -> Flat Map(8/16); 19m 48s Source; Read Text File Source -> Flat Map(16/16); 20m 49s Source; Read Text File Source -> Flat Map(9/16); 21m 06s Source; Read Text File Source -> Flat Map(12/16); 21m 41s Source; Read Text File Source -> Flat Map(3/16); 22m 08s Source; Read Text File Source -> Flat Map(11/16); 22m 08s Source; Read Text File Source -> Flat Map(1/16); 24m 13s Source; Read Text File Source -> Flat Map(13/16); 24m 13s Source; Read Text File Source -> Flat Map(7/16); 24m 14s Keyed Aggregation -> Sink; Unnamed(2/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(12/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(6/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(8/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(14/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(3/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(15/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(4/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(9/16); 24m 23s Keyed Aggregation -> Sink; Unnamed(7/16); 24m 24s Keyed Aggregation -> Sink; Unnamed(11/16); 24m 24s Keyed Aggregation -> Sink; Unnamed(13/16); 24m 24s Keyed Aggregation -> Sink; Unnamed(1/16); 25m 07s Keyed Aggregation -> Sink; Unnamed(5/16); 25m 08s Keyed Aggregation -> Sink; Unnamed(10/16); 25m 19s Keyed Aggregation -> Sink; Unnamed(16/16); 25m 21s #============================================================================== # parallelism.default: 32 #============================================================================== Source: Read Text File Source -> Flat Map(14/32): 05m 25s Source: Read Text File Source -> Flat Map(2/32): 05m 41s Source: Read Text File Source -> Flat Map(26/32): 07m 24s Source: Read Text File Source -> Flat Map(15/32): 09m 35s Source: Read Text File Source -> Flat Map(9/32): 10m 23s Source: Read Text File Source -> Flat Map(11/32): 10m 40s Source: Read Text File Source -> Flat Map(31/32): 10m 40s Source: Read Text File Source -> Flat Map(27/32): 10m 41s Source: Read Text File Source -> Flat Map(20/32): 13m 25s Source: Read Text File Source -> Flat Map(29/32): 15m 02s Source: Read Text File Source -> Flat Map(5/32): 15m 43s Source: Read Text File Source -> Flat Map(16/32): 16m 00s Source: Read Text File Source -> Flat Map(21/32): 16m 18s Source: Read Text File Source -> Flat Map(6/32): 17m 28s Source: Read Text File Source -> Flat Map(10/32): 18m 37s Source: Read Text File Source -> Flat Map(25/32): 18m 37s Source: Read Text File Source -> Flat Map(19/32): 18m 37s Source: Read Text File Source -> Flat Map(18/32): 19m 30s Source: Read Text File Source -> Flat Map(28/32): 19m 48s Source: Read Text File Source -> Flat Map(8/32): 20m 05s Source: Read Text File Source -> Flat Map(7/32): 20m 05s Source: Read Text File Source -> Flat Map(22/32): 20m 17s Source: Read Text File Source -> Flat Map(1/32): 20m 34s Source: Read Text File Source -> Flat Map(32/32): 21m 56s Source: Read Text File Source -> Flat Map(13/32): 21m 56s Source: Read Text File Source -> Flat Map(12/32): 21m 56s Source: Read Text File Source -> Flat Map(3/32): 21m 56s Source: Read Text File Source -> Flat Map(30/32): 21m 56s Source: Read Text File Source -> Flat Map(24/32): 22m 25s Source: Read Text File Source -> Flat Map(17/32): 22m 26s Source: Read Text File Source -> Flat Map(23/32): 22m 38s Source: Read Text File Source -> Flat Map(4/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(12/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(21/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(22/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(11/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(18/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(5/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(24/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(13/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(6/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(9/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(26/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(2/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(3/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(28/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(15/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(19/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(7/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(27/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(25/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(23/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(14/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(17/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(20/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(29/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(32/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(31/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(4/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(8/32): 22m 55s Keyed Aggregation -> Sink: Unnamed(30/32): 23m 31s Keyed Aggregation -> Sink: Unnamed(1/32): 23m 31s Keyed Aggregation -> Sink: Unnamed(10/32): 23m 31s Keyed Aggregation -> Sink: Unnamed(16/32): 23m 32s #============================================================================== # parallelism.default: 64 #============================================================================== Source: Read Text File Source -> Flat Map(13/64): 12m 48s Source: Read Text File Source -> Flat Map(38/64): 13m 29s Source: Read Text File Source -> Flat Map(47/64): 14m 37s Source: Read Text File Source -> Flat Map(24/64): 17m 56s Source: Read Text File Source -> Flat Map(25/64): 20m 47s Source: Read Text File Source -> Flat Map(4/64): 20m 58s Source: Read Text File Source -> Flat Map(45/64): 21m 55s Source: Read Text File Source -> Flat Map(14/64): 21m 55s Source: Read Text File Source -> Flat Map(53/64): 22m 13s Source: Read Text File Source -> Flat Map(48/64): 23m 50s Source: Read Text File Source -> Flat Map(50/64): 23m 50s Source: Read Text File Source -> Flat Map(16/64): 24m 00s Source: Read Text File Source -> Flat Map(8/64): 24m 11s Source: Read Text File Source -> Flat Map(3/64): 24m 11s Source: Read Text File Source -> Flat Map(12/64): 24m 23s Source: Read Text File Source -> Flat Map(59/64): 24m 23s Source: Read Text File Source -> Flat Map(49/64): 24m 23s Source: Read Text File Source -> Flat Map(35/64): 25m 15s Source: Read Text File Source -> Flat Map(64/64): 26m 11s Source: Read Text File Source -> Flat Map(40/64): 27m 09s Source: Read Text File Source -> Flat Map(54/64): 27m 38s Source: Read Text File Source -> Flat Map(5/64): 28m 56s Source: Read Text File Source -> Flat Map(28/64): 30m 43s Source: Read Text File Source -> Flat Map(17/64): 30m 43s Source: Read Text File Source -> Flat Map(11/64): 30m 43s Source: Read Text File Source -> Flat Map(7/64): 31m 47s Source: Read Text File Source -> Flat Map(18/64): 31m 54s Source: Read Text File Source -> Flat Map(23/64): 31m 54s Source: Read Text File Source -> Flat Map(32/64): 32m 46s Source: Read Text File Source -> Flat Map(60/64): 32m 46s Source: Read Text File Source -> Flat Map(52/64): 32m 46s Source: Read Text File Source -> Flat Map(19/64): 33m 33s Source: Read Text File Source -> Flat Map(10/64): 33m 33s Source: Read Text File Source -> Flat Map(61/64): 33m 59s Source: Read Text File Source -> Flat Map(2/64): 34m 18s Source: Read Text File Source -> Flat Map(22/64): 34m 18s Source: Read Text File Source -> Flat Map(6/64): 34m 18s Source: Read Text File Source -> Flat Map(26/64): 34m 58s Source: Read Text File Source -> Flat Map(56/64): 36m 22s Source: Read Text File Source -> Flat Map(36/64): 37m 07s Source: Read Text File Source -> Flat Map(42/64): 37m 19s Source: Read Text File Source -> Flat Map(20/64): 37m 50s Source: Read Text File Source -> Flat Map(55/64): 38m 34s Source: Read Text File Source -> Flat Map(63/64): 38m 47s Source: Read Text File Source -> Flat Map(1/64): 38m 47s Source: Read Text File Source -> Flat Map(43/64): 40m 19s Source: Read Text File Source -> Flat Map(51/64): 40m 37s Source: Read Text File Source -> Flat Map(46/64): 41m 58s Source: Read Text File Source -> Flat Map(9/64): 42m 04s Source: Read Text File Source -> Flat Map(57/64): 42m 41s Source: Read Text File Source -> Flat Map(33/64): 43m 25s Source: Read Text File Source -> Flat Map(30/64): 43m 25s Source: Read Text File Source -> Flat Map(44/64): 45m 04s Source: Read Text File Source -> Flat Map(34/64): 46m 17s Source: Read Text File Source -> Flat Map(39/64): 46m 40s Source: Read Text File Source -> Flat Map(58/64): 46m 49s Source: Read Text File Source -> Flat Map(27/64): 47m 45s Source: Read Text File Source -> Flat Map(29/64): 47m 45s Source: Read Text File Source -> Flat Map(31/64): 48m 21s Source: Read Text File Source -> Flat Map(21/64): 48m 38s Source: Read Text File Source -> Flat Map(41/64): 49m 41s Source: Read Text File Source -> Flat Map(15/64): 53m 41s Source: Read Text File Source -> Flat Map(37/64): 54m 27s Source: Read Text File Source -> Flat Map(62/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(36/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(61/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(64/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(25/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(23/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(2/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(15/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(18/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(34/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(9/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(27/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(40/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(54/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(24/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(19/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(56/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(8/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(46/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(6/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(58/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(59/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(10/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(55/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(38/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(7/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(20/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(5/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(49/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(12/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(28/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(31/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(29/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(50/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(60/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(26/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(43/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(17/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(47/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(37/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(3/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(13/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(14/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(48/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(30/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(63/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(45/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(35/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(51/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(22/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(44/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(32/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(42/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(52/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(41/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(39/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(4/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(21/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(62/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(57/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(33/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(1/64): 54m 56s Keyed Aggregation -> Sink: Unnamed(53/64): 55m 24s Keyed Aggregation -> Sink: Unnamed(11/64): 55m 24s Keyed Aggregation -> Sink: Unnamed(16/64): 55m 25s |
Hi! How many machines are you using for this? The fact that you are giving 64 slots to each TaskManager means that a single TaskManager may end up executing all 64 pipelines. That would heavily overload that TaskManager and cause heavy degradation. If, for example, you use 16 machines, then give each machine 4 task slots (total of 64 slots across all machines) That way, the final run (parallelism 64) will be guaranteed to be spread across all machines. Greetings, Stephan On Thu, Mar 31, 2016 at 4:50 PM, Shinhyung Yang <[hidden email]> wrote: Dear flink users and developers, |
Thank you for replying!
I am trying to do this on a single machine in fact. Since it has 64 cores, it would be interesting to look at the performance in that regard. > How many machines are you using for this? > > The fact that you are giving 64 slots to each TaskManager means that a > single TaskManager may end up executing all 64 pipelines. That would heavily > overload that TaskManager and cause heavy degradation. Does it make sense if I run multiple TaskManagers on a single machine if 64 slots are too many for a TaskManager? > If, for example, you use 16 machines, then give each machine 4 task slots > (total of 64 slots across all machines) > That way, the final run (parallelism 64) will be guaranteed to be spread > across all machines. My intention for the experiment at the moment is to try to scale the application up on a single machine to its maximum before moving on to run the experiment on multiple machines. Thank you again! With best regards, Shinhyung Yang |
Hi, usually it doesn't make sense to run multiple task managers on a single machine to get more slots. Your machine has only 4 CPU cores, so you are just putting a lot of pressure on the cpu scheduler.. On Thu, Mar 31, 2016 at 7:16 PM, Shinhyung Yang <[hidden email]> wrote: Thank you for replying! |
Hi, I'm afraid no one read your email carefully. You indeed have one very big machine with 64 physical CPU cores and 120 GB of RAM, correct? In that case, the amount of RAM you give to the TaskManager seems to low. Could you try re-running your experiments with: jobmanager.heap.mb: 5000 taskmanager.heap.mb: 100000 So, about 100 GB of RAM for the TaskManager. Cheers, Aljoscha On Mon, 4 Apr 2016 at 10:32 Robert Metzger <[hidden email]> wrote:
|
In reply to this post by rmetzger0
Just to clarify: Shinhyung is running one a single node with 4 CPUs,
each having 16 cores. On Mon, Apr 4, 2016 at 10:32 AM, Robert Metzger <[hidden email]> wrote: > Hi, > > usually it doesn't make sense to run multiple task managers on a single > machine to get more slots. > Your machine has only 4 CPU cores, so you are just putting a lot of pressure > on the cpu scheduler.. > > On Thu, Mar 31, 2016 at 7:16 PM, Shinhyung Yang <[hidden email]> > wrote: >> >> Thank you for replying! >> >> I am trying to do this on a single machine in fact. Since it has 64 >> cores, it would be interesting to look at the performance in that >> regard. >> >> > How many machines are you using for this? >> > >> > The fact that you are giving 64 slots to each TaskManager means that a >> > single TaskManager may end up executing all 64 pipelines. That would >> > heavily >> > overload that TaskManager and cause heavy degradation. >> >> Does it make sense if I run multiple TaskManagers on a single machine >> if 64 slots are too many for a TaskManager? >> >> > If, for example, you use 16 machines, then give each machine 4 task >> > slots >> > (total of 64 slots across all machines) >> > That way, the final run (parallelism 64) will be guaranteed to be spread >> > across all machines. >> >> My intention for the experiment at the moment is to try to scale the >> application up on a single machine to its maximum before moving on to >> run the experiment on multiple machines. >> >> Thank you again! >> With best regards, >> Shinhyung Yang > > |
In reply to this post by Aljoscha Krettek
Dear Aljoscha and Ufuk,
Thank you for clarifying! Yes I'm running this wordcount application on a 64-core machine with 120GB ram allocated for users. > In that case, the amount of RAM you give to the TaskManager seems to low. > Could you try re-running your experiments with: > jobmanager.heap.mb: 5000 > taskmanager.heap.mb: 100000 > > So, about 100 GB of RAM for the TaskManager. Definitely I will try this! The result will be really interesting for sure. In this case, am I still good to go with 64 task slots with a single task manager? Thank you. With best regards, Shinhyung Yang. |
Hi, I am not sure since people normally don't run Flink on such large machines. They rather run it on many smaller machines. I will definitely be interesting too see your new results where the Job can actually use all the memory available on the machine. -- aljoscha On Mon, 4 Apr 2016 at 15:54 Shinhyung Yang <[hidden email]> wrote: Dear Aljoscha and Ufuk, |
Free forum by Nabble | Edit this page |