Flink application does not scale as expected, please help!

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink application does not scale as expected, please help!

Siew Wai Yow

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow



flink_app_parser_git.zip (108K) Download Attachment
JM.png (143K) Download Attachment
sample.png (72K) Download Attachment
scaleNotWork.png (109K) Download Attachment
TM.png (234K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Jörn Franke
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>
Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Siew Wai Yow

Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

Thank you!

Yow



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>
Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Siew Wai Yow

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?



From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 

Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

Thank you!

Yow



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>
Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Jörn Franke
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?



From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 

Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

Thank you!

Yow



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>
Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Siew Wai Yow

Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git

Thank you!



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?



From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 

Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

Thank you!

Yow



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>
Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Fabian Hueske-2
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:

Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git

Thank you!



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?



From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 

Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

Thank you!

Yow



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>

Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Siew Wai Yow

Hi Fabian,


We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?


"Hence, applications might scale better until tasks are scheduled to different machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the performance drop, not even in par with case of parallelism 16. Is this something expected? Thank you.

Regards,
Yow


From: Fabian Hueske <[hidden email]>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:

Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git

Thank you!



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?



From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 

Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

Thank you!

Yow



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>

Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Siew Wai Yow

*additional info in bold.



From: Siew Wai Yow <[hidden email]>
Sent: Monday, June 18, 2018 3:57 PM
To: Fabian Hueske
Cc: Jörn Franke; [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 

Hi Fabian,


We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?


"Hence, applications might scale better until tasks are scheduled to different machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine, total 5 TM. So the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32(which it happen amongst different machine) the performance drop, not even in par with case of parallelism 16. Is this something expected? Thank you.

Regards,
Yow


From: Fabian Hueske <[hidden email]>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:

Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git

Thank you!



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?



From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 

Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

Thank you!

Yow



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>

Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Fabian Hueske-2
In reply to this post by Siew Wai Yow
Not sure if TM local assignment is explicitly designed in 1.5.0, but it might be an artifact of how slots are registered in the resource manager.
Till (in CC) should know how that works.

Anyway, tasks that run in the same TM exchange data via in-memory channels which is of course much faster than going over the network.
So yes, a performance drop when tasks are scheduled to different TMs is not unexpected IMO.
You can check that by starting multiple TMs with a single slot each and running you job on that setup.

Best, Fabian



2018-06-18 9:57 GMT+02:00 Siew Wai Yow <[hidden email]>:

Hi Fabian,


We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?


"Hence, applications might scale better until tasks are scheduled to different machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the performance drop, not even in par with case of parallelism 16. Is this something expected? Thank you.

Regards,
Yow


From: Fabian Hueske <[hidden email]>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; [hidden email]

Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:

Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git

Thank you!



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?



From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 

Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

Thank you!

Yow



From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.


Thanks.


Regards,

Yow


<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>


Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Ovidiu-Cristian MARCU
Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is running with the same parallelism as successive map operator and the workflow optimizes to collocate source and map tasks if possible.

For an application configuring the source with different parallelism, assuming N task managers each with m slots, if I configure
the source operator with parallelism m, then all of the source's tasks could be scheduled on the first task manager?
I think the same story holds for sinks tasks.
So, in general is there any control over scheduling of source and sink tasks?
Would it be possible to enforce scheduling of source tasks to be balanced across task managers? Not sure if this is the default.
If the source creates a non-keyed stream, can we enforce the source to push records to local map tasks?

For Siew’s example, after source#map a keyBy complicates further things since each key can be possibly processed on another task manager.
At least the keyBy operator should run with the same parallelism as source and map and be pipelined on same slot (maybe shared slot configuration could enforce that).

DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream
.map(new JsonToRecordTranslator(markerFactory.getMarker(), inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) 
.keyBy(new KeySelector<Record, String>() {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Record r) throws Exception {
return r.getUNIQUE_KEY(); 
}
}) 
.process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())).setParallelism(pAggr)
.name("AggregationDuration: " + aggrDuration +"ms");


Thanks,
Ovidiu


On 18 Jun 2018, at 10:05, Fabian Hueske <[hidden email]> wrote:

Not sure if TM local assignment is explicitly designed in 1.5.0, but it might be an artifact of how slots are registered in the resource manager.
Till (in CC) should know how that works.

Anyway, tasks that run in the same TM exchange data via in-memory channels which is of course much faster than going over the network.
So yes, a performance drop when tasks are scheduled to different TMs is not unexpected IMO.
You can check that by starting multiple TMs with a single slot each and running you job on that setup.

Best, Fabian



2018-06-18 9:57 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Fabian,

We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?

"Hence, applications might scale better until tasks are scheduled to different machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the performance drop, not even in par with case of parallelism 16. Is this something expected? Thank you.

Regards,
Yow


From: Fabian Hueske <[hidden email]>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; [hidden email]

Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git
Thank you!


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?


From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)


Thank you!
Yow


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.



Thanks.

Regards,
Yow

<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>



Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Siew Wai Yow

Thanks @Fabian for your confirmation about the better performance when scaling happened at same TM machine. But it is so funny that it give impression "the more I scale the less I get" when the performance drop with more TM in play.


@Ovidiu question is interesting to know too. @Till do you mind to share your thoughts?

Thank you guys!


From: Ovidiu-Cristian MARCU <[hidden email]>
Sent: Monday, June 18, 2018 6:28 PM
To: Fabian Hueske
Cc: Siew Wai Yow; Jörn Franke; [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is running with the same parallelism as successive map operator and the workflow optimizes to collocate source and map tasks if possible.

For an application configuring the source with different parallelism, assuming N task managers each with m slots, if I configure
the source operator with parallelism m, then all of the source's tasks could be scheduled on the first task manager?
I think the same story holds for sinks tasks.
So, in general is there any control over scheduling of source and sink tasks?
Would it be possible to enforce scheduling of source tasks to be balanced across task managers? Not sure if this is the default.
If the source creates a non-keyed stream, can we enforce the source to push records to local map tasks?

For Siew’s example, after source#map a keyBy complicates further things since each key can be possibly processed on another task manager.
At least the keyBy operator should run with the same parallelism as source and map and be pipelined on same slot (maybe shared slot configuration could enforce that).

DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream
.map(new JsonToRecordTranslator(markerFactory.getMarker(), inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) 
.keyBy(new KeySelector<Record, String>() {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Record r) throws Exception {
return r.getUNIQUE_KEY(); 
}
}) 
.process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())).setParallelism(pAggr)
.name("AggregationDuration: " + aggrDuration +"ms");


Thanks,
Ovidiu


On 18 Jun 2018, at 10:05, Fabian Hueske <[hidden email]> wrote:

Not sure if TM local assignment is explicitly designed in 1.5.0, but it might be an artifact of how slots are registered in the resource manager.
Till (in CC) should know how that works.

Anyway, tasks that run in the same TM exchange data via in-memory channels which is of course much faster than going over the network.
So yes, a performance drop when tasks are scheduled to different TMs is not unexpected IMO.
You can check that by starting multiple TMs with a single slot each and running you job on that setup.

Best, Fabian



2018-06-18 9:57 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Fabian,

We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?

"Hence, applications might scale better until tasks are scheduled to different machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the performance drop, not even in par with case of parallelism 16. Is this something expected? Thank you.

Regards,
Yow


From: Fabian Hueske <[hidden email]>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; [hidden email]

Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git
Thank you!


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?


From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)


Thank you!
Yow


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.



Thanks.

Regards,
Yow

<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>



Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Till Rohrmann
Hi,

as Fabian explained, if you exceed the number of slots on a single TM, then Flink needs to deploy tasks on other TMs which causes a network transfer between the sources and the mappers. This will have a negative impact if you compare it to a setup where everything runs locally.

The scheduling in Flink works as follows: Flink will try to co-locate consumer tasks with their producers in order to avoid costly network communication. If there are more than 8 producer tasks, then it is assumed that the consumer can be scheduled everywhere because you might only achieve to read 1/8 of your input locally. 

The question now is how the sources are scheduled, because they determine how the mappers are scheduled. For tasks with no inputs, Flink will pick a random slot without preferences for spreading them across as many TMs as possible or packing them as closely together as possible. The reason is that both strategies might perform better depending on the actual use case. Moreover, the way the scheduling works right now, the scheduler does not know how many sources will be scheduled. This means that you don't necessarily know how many TMs you have to start in order to execute all source tasks and, thus, you don't know across how many TMs you have to spread your sources. Due to the implementation of the SlotManager, which is responsible for the slot management on the RM, we will first allocate all slots from a single TM before allocating slots from another TM.

Due to the setup of your test (different parallelism of sources and mappers), you introduce a shuffle between the sources and the Json mappers. This is not optimal and I would suggest to instantiate as many sources as you have mappers. That way these two operators can be chained and run in the same thread. This would also spread the load of generating events across the whole cluster instead of letting a single machine be responsible for producing all events. Moreover, in order to get rid of the measurement artifacts introduced by switching from local to remote communication you could start your TMs with a single slot as Fabian suggested. Otherwise you might see a slight drop whenever you exceed the multiple of 16 by one.

As a side node, if you don't need a shuffle when changing the parallelism between two operators, then you could also use the rescale command which tries to minimize the communication between two operators. For example, if your source has a parallelism of 2 and your mapper of 4, then rescale would cause the first source to talk to mapper 1 and 2 and the second source to only talk to mapper 3 and 4.

Cheers,
Till

On Mon, Jun 18, 2018 at 5:30 PM Siew Wai Yow <[hidden email]> wrote:

Thanks @Fabian for your confirmation about the better performance when scaling happened at same TM machine. But it is so funny that it give impression "the more I scale the less I get" when the performance drop with more TM in play.


@Ovidiu question is interesting to know too. @Till do you mind to share your thoughts?

Thank you guys!


From: Ovidiu-Cristian MARCU <[hidden email]>
Sent: Monday, June 18, 2018 6:28 PM
To: Fabian Hueske
Cc: Siew Wai Yow; Jörn Franke; [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is running with the same parallelism as successive map operator and the workflow optimizes to collocate source and map tasks if possible.

For an application configuring the source with different parallelism, assuming N task managers each with m slots, if I configure
the source operator with parallelism m, then all of the source's tasks could be scheduled on the first task manager?
I think the same story holds for sinks tasks.
So, in general is there any control over scheduling of source and sink tasks?
Would it be possible to enforce scheduling of source tasks to be balanced across task managers? Not sure if this is the default.
If the source creates a non-keyed stream, can we enforce the source to push records to local map tasks?

For Siew’s example, after source#map a keyBy complicates further things since each key can be possibly processed on another task manager.
At least the keyBy operator should run with the same parallelism as source and map and be pipelined on same slot (maybe shared slot configuration could enforce that).

DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream
.map(new JsonToRecordTranslator(markerFactory.getMarker(), inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) 
.keyBy(new KeySelector<Record, String>() {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Record r) throws Exception {
return r.getUNIQUE_KEY(); 
}
}) 
.process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())).setParallelism(pAggr)
.name("AggregationDuration: " + aggrDuration +"ms");


Thanks,
Ovidiu

Execution resources in Flink are defined through Task Slots. Each TaskManager will have one or more task slots, each of which can run one pipeline of parallel tasks. A pipeline consists of multiple successive tasks, such as the n-th parallel instance of a MapFunction together with the n-th parallel ...

On 18 Jun 2018, at 10:05, Fabian Hueske <[hidden email]> wrote:

Not sure if TM local assignment is explicitly designed in 1.5.0, but it might be an artifact of how slots are registered in the resource manager.
Till (in CC) should know how that works.

Anyway, tasks that run in the same TM exchange data via in-memory channels which is of course much faster than going over the network.
So yes, a performance drop when tasks are scheduled to different TMs is not unexpected IMO.
You can check that by starting multiple TMs with a single slot each and running you job on that setup.

Best, Fabian



2018-06-18 9:57 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Fabian,

We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?

"Hence, applications might scale better until tasks are scheduled to different machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the performance drop, not even in par with case of parallelism 16. Is this something expected? Thank you.

Regards,
Yow


From: Fabian Hueske <[hidden email]>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; [hidden email]

Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git
Thank you!


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?


From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)


Thank you!
Yow


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.



Thanks.

Regards,
Yow

<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>



Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Fabian Hueske-2
In reply to this post by Siew Wai Yow
Hi Siew,

The hint about the lower source parallelism compared to the operator parallelism might be the right one.

Can you check if all source tasks are scheduled to the same machine?
In that case your application might be bottlenecked by the out-going network connection of that single machine.

Best, Fabian



2018-06-18 17:29 GMT+02:00 Siew Wai Yow <[hidden email]>:

Thanks @Fabian for your confirmation about the better performance when scaling happened at same TM machine. But it is so funny that it give impression "the more I scale the less I get" when the performance drop with more TM in play.


@Ovidiu question is interesting to know too. @Till do you mind to share your thoughts?

Thank you guys!


From: Ovidiu-Cristian MARCU <[hidden email]>
Sent: Monday, June 18, 2018 6:28 PM
To: Fabian Hueske
Cc: Siew Wai Yow; Jörn Franke; [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is running with the same parallelism as successive map operator and the workflow optimizes to collocate source and map tasks if possible.

For an application configuring the source with different parallelism, assuming N task managers each with m slots, if I configure
the source operator with parallelism m, then all of the source's tasks could be scheduled on the first task manager?
I think the same story holds for sinks tasks.
So, in general is there any control over scheduling of source and sink tasks?
Would it be possible to enforce scheduling of source tasks to be balanced across task managers? Not sure if this is the default.
If the source creates a non-keyed stream, can we enforce the source to push records to local map tasks?

For Siew’s example, after source#map a keyBy complicates further things since each key can be possibly processed on another task manager.
At least the keyBy operator should run with the same parallelism as source and map and be pipelined on same slot (maybe shared slot configuration could enforce that).

DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream
.map(new JsonToRecordTranslator(markerFactory.getMarker(), inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) 
.keyBy(new KeySelector<Record, String>() {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Record r) throws Exception {
return r.getUNIQUE_KEY(); 
}
}) 
.process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())).setParallelism(pAggr)
.name("AggregationDuration: " + aggrDuration +"ms");


Thanks,
Ovidiu

Execution resources in Flink are defined through Task Slots. Each TaskManager will have one or more task slots, each of which can run one pipeline of parallel tasks. A pipeline consists of multiple successive tasks, such as the n-th parallel instance of a MapFunction together with the n-th parallel ...

On 18 Jun 2018, at 10:05, Fabian Hueske <[hidden email]> wrote:

Not sure if TM local assignment is explicitly designed in 1.5.0, but it might be an artifact of how slots are registered in the resource manager.
Till (in CC) should know how that works.

Anyway, tasks that run in the same TM exchange data via in-memory channels which is of course much faster than going over the network.
So yes, a performance drop when tasks are scheduled to different TMs is not unexpected IMO.
You can check that by starting multiple TMs with a single slot each and running you job on that setup.

Best, Fabian



2018-06-18 9:57 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Fabian,

We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?

"Hence, applications might scale better until tasks are scheduled to different machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the performance drop, not even in par with case of parallelism 16. Is this something expected? Thank you.

Regards,
Yow


From: Fabian Hueske <[hidden email]>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; [hidden email]

Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git
Thank you!


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?


From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)


Thank you!
Yow


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.



Thanks.

Regards,
Yow

<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>




Reply | Threaded
Open this post in threaded view
|

Re: Flink application does not scale as expected, please help!

Siew Wai Yow
In reply to this post by Till Rohrmann

Thanks @Fabian and @Till for the good explanation. The picture is pretty clear right now. As for the single slot TM test, seems it trying to allocate the same machine's slot first as well. But with that the result is less spiky. i guess @Fabian is right that network is our bottleneck in case because we have only 1Gbps. Will fix it and test again.


Thanks guys, including @Sihua zhou who help to test the program in his lab :)



From: Till Rohrmann <[hidden email]>
Sent: Tuesday, June 19, 2018 3:48 PM
To: [hidden email]
Cc: [hidden email]; Fabian Hueske; Jörn Franke; user
Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

as Fabian explained, if you exceed the number of slots on a single TM, then Flink needs to deploy tasks on other TMs which causes a network transfer between the sources and the mappers. This will have a negative impact if you compare it to a setup where everything runs locally.

The scheduling in Flink works as follows: Flink will try to co-locate consumer tasks with their producers in order to avoid costly network communication. If there are more than 8 producer tasks, then it is assumed that the consumer can be scheduled everywhere because you might only achieve to read 1/8 of your input locally. 

The question now is how the sources are scheduled, because they determine how the mappers are scheduled. For tasks with no inputs, Flink will pick a random slot without preferences for spreading them across as many TMs as possible or packing them as closely together as possible. The reason is that both strategies might perform better depending on the actual use case. Moreover, the way the scheduling works right now, the scheduler does not know how many sources will be scheduled. This means that you don't necessarily know how many TMs you have to start in order to execute all source tasks and, thus, you don't know across how many TMs you have to spread your sources. Due to the implementation of the SlotManager, which is responsible for the slot management on the RM, we will first allocate all slots from a single TM before allocating slots from another TM.

Due to the setup of your test (different parallelism of sources and mappers), you introduce a shuffle between the sources and the Json mappers. This is not optimal and I would suggest to instantiate as many sources as you have mappers. That way these two operators can be chained and run in the same thread. This would also spread the load of generating events across the whole cluster instead of letting a single machine be responsible for producing all events. Moreover, in order to get rid of the measurement artifacts introduced by switching from local to remote communication you could start your TMs with a single slot as Fabian suggested. Otherwise you might see a slight drop whenever you exceed the multiple of 16 by one.

As a side node, if you don't need a shuffle when changing the parallelism between two operators, then you could also use the rescale command which tries to minimize the communication between two operators. For example, if your source has a parallelism of 2 and your mapper of 4, then rescale would cause the first source to talk to mapper 1 and 2 and the second source to only talk to mapper 3 and 4.

Cheers,
Till



From: Fabian Hueske <[hidden email]>
Sent: Tuesday, June 19, 2018 3:55 PM
To: Siew Wai Yow
Cc: Ovidiu-Cristian MARCU; Jörn Franke; [hidden email]; [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi Siew,

The hint about the lower source parallelism compared to the operator parallelism might be the right one.

Can you check if all source tasks are scheduled to the same machine?
In that case your application might be bottlenecked by the out-going network connection of that single machine.

Best, Fabian


On Mon, Jun 18, 2018 at 5:30 PM Siew Wai Yow <[hidden email]> wrote:

Thanks @Fabian for your confirmation about the better performance when scaling happened at same TM machine. But it is so funny that it give impression "the more I scale the less I get" when the performance drop with more TM in play.


@Ovidiu question is interesting to know too. @Till do you mind to share your thoughts?

Thank you guys!


From: Ovidiu-Cristian MARCU <[hidden email]>
Sent: Monday, June 18, 2018 6:28 PM
To: Fabian Hueske
Cc: Siew Wai Yow; Jörn Franke; [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi all,

Allow me to add some comments/questions on this issue that is very interesting.
According to documentation [1] the pipeline example assumes the source is running with the same parallelism as successive map operator and the workflow optimizes to collocate source and map tasks if possible.

For an application configuring the source with different parallelism, assuming N task managers each with m slots, if I configure
the source operator with parallelism m, then all of the source's tasks could be scheduled on the first task manager?
I think the same story holds for sinks tasks.
So, in general is there any control over scheduling of source and sink tasks?
Would it be possible to enforce scheduling of source tasks to be balanced across task managers? Not sure if this is the default.
If the source creates a non-keyed stream, can we enforce the source to push records to local map tasks?

For Siew’s example, after source#map a keyBy complicates further things since each key can be possibly processed on another task manager.
At least the keyBy operator should run with the same parallelism as source and map and be pipelined on same slot (maybe shared slot configuration could enforce that).

DataStream<Record> AggregatedRecordWithAuditStream = sourceStringStream
.map(new JsonToRecordTranslator(markerFactory.getMarker(), inputlink)).name("JsonRecTranslator").setParallelism(pJ2R) 
.keyBy(new KeySelector<Record, String>() {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Record r) throws Exception {
return r.getUNIQUE_KEY(); 
}
}) 
.process(new ProcessAggregation(aggrDuration, markerFactory.getMarker(), markerFactory.getMarker())).setParallelism(pAggr)
.name("AggregationDuration: " + aggrDuration +"ms");


Thanks,
Ovidiu

Execution resources in Flink are defined through Task Slots. Each TaskManager will have one or more task slots, each of which can run one pipeline of parallel tasks. A pipeline consists of multiple successive tasks, such as the n-th parallel instance of a MapFunction together with the n-th parallel ...

On 18 Jun 2018, at 10:05, Fabian Hueske <[hidden email]> wrote:

Not sure if TM local assignment is explicitly designed in 1.5.0, but it might be an artifact of how slots are registered in the resource manager.
Till (in CC) should know how that works.

Anyway, tasks that run in the same TM exchange data via in-memory channels which is of course much faster than going over the network.
So yes, a performance drop when tasks are scheduled to different TMs is not unexpected IMO.
You can check that by starting multiple TMs with a single slot each and running you job on that setup.

Best, Fabian



2018-06-18 9:57 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Fabian,

We are using Flink 1.5.0. Any different in scheduler in Flink 1.5.0?

"Hence, applications might scale better until tasks are scheduled to different machines."

This seems the case. We have 32 vCPU 16 slots in one TM machine. So the scaling work perfectly 1-2-4-8-16 because all happens in same TM. When scale to 32 the performance drop, not even in par with case of parallelism 16. Is this something expected? Thank you.

Regards,
Yow


From: Fabian Hueske <[hidden email]>
Sent: Monday, June 18, 2018 3:47 PM
To: Siew Wai Yow
Cc: Jörn Franke; [hidden email]

Subject: Re: Flink application does not scale as expected, please help!
 
Hi,

Which Flink version are you using?
Did you try to analyze the bottleneck of the application, i.e., is it CPU, disk IO, or network bound?

Regarding the task scheduling. AFAIK, before version 1.5.0, Flink tried to schedule tasks on the same machine to reduce the amount of network transfer.
Hence, applications might scale better until tasks are scheduled to different machines.

Fabian

2018-06-16 12:20 GMT+02:00 Siew Wai Yow <[hidden email]>:
Hi Jorn, Please find the source @https://github.com/swyow/flink_sample_git
Thank you!


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 6:03 PM

To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Can you share the app source on gitlab, github or bitbucket etc? 

On 16. Jun 2018, at 11:46, Siew Wai Yow <[hidden email]> wrote:

Hi, There is an interesting finding, the reason of low parallelism work much better is because all task being run in same TM, once we scale more, the task is distributed to different TM and the performance worse than the low parallelism case. Is this something expected? The more I scale the less I get?


From: Siew Wai Yow <[hidden email]>
Sent: Saturday, June 16, 2018 5:09 PM
To: Jörn Franke
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
Hi Jorn, the input data is 1kb per record, in production it will have 10 billions of record per day and it will be increased so scalability is quite important to us to handle more data. Unfortunately this is not work as expected even with only 10 millions of testing data. The test application is just a simple jackson map + an empty process. CPU and memory is not an issue as we have 32 vCPU + 100 GB RAM per TM. Network should be fine as well as total TX+RX peak is around 800Mbps while we have 1000Mbps. Do you mind to share your thought? Or mind to test the attach application in your lab?


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)


Thank you!
Yow


From: Jörn Franke <[hidden email]>
Sent: Saturday, June 16, 2018 4:46 PM
To: Siew Wai Yow
Cc: [hidden email]
Subject: Re: Flink application does not scale as expected, please help!
 
How large is the input data? If the input data is very small then it does not make sense to scale it even more. The larger the data is the more parallelism you will have. You can modify this behavior of course by changing the partition on the Dataset.

On 16. Jun 2018, at 10:41, Siew Wai Yow <[hidden email]> wrote:

Hi,


We found that our Flink application with simple logic, which using process function is not scale-able when scale from 8 parallelism onward even though with sufficient resources. Below it the result which is capped at ~250k TPS. No matter how we tune the parallelism of the operators it just not scale, same to increase source parallelism.


Please refer to "scaleNotWork.png",
1. fixed source parallelism 4, other operators parallelism 8
2. fixed source parallelism 4, other operators parallelism 16
3. fixed source parallelism 4, other operators parallelism 32
4. fixed source parallelism 6, other operators parallelism 8
5. fixed source parallelism 6, other operators parallelism 16
6. fixed source parallelism 6, other operators parallelism 32
7. fixed source parallelism 6, other operators parallelism 64 performance worse than parallelism 32.


Sample source code attached(flink_app_parser_git.zip). It is a simple program, parsing json record into object, and pass it to a empty logic Flink's process function. Rocksdb is in used, and the source is generated by the program itself. This could be reproduce easily.


We choose Flink because of it scalability, but this is not the case now, appreciated if anyone could help as this is impacting our projects! thank you.


To run the program, sample parameters,

"aggrinterval=6000000 loop=7500000 statsd=1 psrc=4 pJ2R=32 pAggr=72 URL=do36.mycompany.com:8127"

  • aggrinterval: time in ms for timer to trigger
  • loop: how many row of data to feed
  • statsd: to send result to statsd
  • psrc: source parallelism
  • pJ2R: parallelism of map operator(JsonRecTranslator)
  • pAggr: parallelism of process+timer operator(AggregationDuration)

We are running in VMWare, 5 Task Managers and each has 32 slots.


Architecture: x86_64
CPU op-mode(s): 32-bit, 64-bit
Byte Order: Little Endian
CPU(s): 32
On-line CPU(s) list: 0-31
Thread(s) per core: 1
Core(s) per socket: 1
Socket(s): 32
NUMA node(s): 1
Vendor ID: GenuineIntel
CPU family: 6
Model: 63
Model name: Intel(R) Xeon(R) CPU E5-2640 v3 @ 2.60GHz
Stepping: 2
CPU MHz: 2593.993
BogoMIPS: 5187.98
Hypervisor vendor: VMware
Virtualization type: full
L1d cache: 32K
L1i cache: 32K
L2 cache: 256K
L3 cache: 20480K
NUMA node0 CPU(s): 0-31
Flags: fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts mmx fxsr sse sse2 ss syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts nopl xtopology tsc_reliable nonstop_tsc aperfmperf pni pclmulqdq ssse3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hypervisor lahf_lm epb fsgsbase smep dtherm ida arat pln pts

total used free shared buff/cache available


Mem: 98 24 72 0 1 72
Swap: 3 0 3


Please refer TM.png and JM.png for further details.
The test without any checkpoint enable.



Thanks.

Regards,
Yow

<flink_app_parser_git.zip>
<JM.png>
<sample.png>
<scaleNotWork.png>
<TM.png>