Fw: Flink Cluster Load Distribution Question

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

Fw: Flink Cluster Load Distribution Question

amir bahmanyari
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-


Reply | Threaded
Open this post in threaded view
|

Re: Fw: Flink Cluster Load Distribution Question

Aljoscha Krettek
Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-


Reply | Threaded
Open this post in threaded view
|

Re: Fw: Flink Cluster Load Distribution Question

amir bahmanyari
Hi Aljoscha,
Thanks for your response. Its the same job but I am reading through TextIO() instead of a Kafka topic.
I thought that would make a difference. It doesnt. Same slowness in Flink Cluster.
I had sent you the code with reading from KafkaIO().
Nothing different except commenting out the KafkaIO() & un-commenting TextIO().
Its attached along with the Support class.
Is there anything interesting you see in my configuration that may cause slowness and/or lack of the right distribution in the cluster as a whole?
I also attached my config files in the JM node...same for other nodes.
Have a wonderful day & thanks for your attention.
Amir-




From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-





BenchBeamRunners.java (24K) Download Attachment
BeamAppSupport.java (13K) Download Attachment
flink-conf.yaml (7K) Download Attachment
masters (18 bytes) Download Attachment
slaves (34 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Fw: Flink Cluster Load Distribution Question

amir bahmanyari
In reply to this post by Aljoscha Krettek
Hi Aljoscha,
The JM logs is also attached. Seems like everything is ok, assigned...to all nodes...
Not sure why I dont get performance? :-(
Thanks+regards,
Amir-



From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-





flink-abahman-jobmanager-1-beam1.log (101K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Fw: Flink Cluster Load Distribution Question

amir bahmanyari
In reply to this post by Aljoscha Krettek
Hi Aljoscha,
Experimenting on  relatively smaller file , everything fixed except KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink Cluster when reading tuples by TextIO().
I understand the NW involvement in reading from Kafka topic etc.,  but 50% is significant.
Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same performance & increasing the topic partitions doesnt improve anything.
I thought some of the 64 slots may get multiple-over- parallelism really pushing it to its limit. 64 kafka topic partitions & 400 kafka topic partitions while #slots=64  is the same.

Its still slow for a relatively large file though.
Pls advice if something I can try to improve the cluster performance.
Thanks+regards


From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-




Reply | Threaded
Open this post in threaded view
|

Re: Fw: Flink Cluster Load Distribution Question

Aljoscha Krettek
One observation here is that you're only reading from one file. This will mean that you won't get any parallelism. Everything is executed on just one task/thread.

Cheers,
Aljoscha

On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Experimenting on  relatively smaller file , everything fixed except KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink Cluster when reading tuples by TextIO().
I understand the NW involvement in reading from Kafka topic etc.,  but 50% is significant.
Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same performance & increasing the topic partitions doesnt improve anything.
I thought some of the 64 slots may get multiple-over- parallelism really pushing it to its limit. 64 kafka topic partitions & 400 kafka topic partitions while #slots=64  is the same.

Its still slow for a relatively large file though.
Pls advice if something I can try to improve the cluster performance.
Thanks+regards


From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-




Reply | Threaded
Open this post in threaded view
|

Re: Flink Cluster Load Distribution Question

Aljoscha Krettek
This is not related to Flink, but in Beam you can read from a directory containing many files using something like this (from MinimalWordCount.java in Beam):

TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")

This will read all the files in the directory in parallel.

For reading from Kafka I wrote this on another thread of yours:
Are you sure that all your Kafka partitions contain data. Did you have a look at the Kafka metrics to see how the individual partitions are filled? If only one partition contains data, then you will only read data in one parallel instance of the sources. How are you writing your data to Kafka?

Flink/Beam should read from all partitions if all of them contain data. Could you please verify that all Kafka partitions contain data by looking at the metrics of your Kafka cluster, that would be a first step towards finding out where the problem lies.

By the way, your code uses Beam in a highly non-idiomatic way. Interacting with an outside database, such as Redis, will always be the bottleneck in such a job. Flink provides an abstraction for dealing with state that is vastly superior to using an external system. We recently did a blog post about rewriting a similar streaming use case using Flink's internal state: http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe that's interesting for you.

Cheers,
Aljoscha

On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <[hidden email]> wrote:
Thanks so much Aljoscha 
Is there an example that shows how to read from multiple files accurately or from KafkaIO and get perfect parallelism pls?
Have a great weekend

Sent from my iPhone

On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <[hidden email]> wrote:

One observation here is that you're only reading from one file. This will mean that you won't get any parallelism. Everything is executed on just one task/thread.

Cheers,
Aljoscha

On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Experimenting on  relatively smaller file , everything fixed except KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink Cluster when reading tuples by TextIO().
I understand the NW involvement in reading from Kafka topic etc.,  but 50% is significant.
Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same performance & increasing the topic partitions doesnt improve anything.
I thought some of the 64 slots may get multiple-over- parallelism really pushing it to its limit. 64 kafka topic partitions & 400 kafka topic partitions while #slots=64  is the same.

Its still slow for a relatively large file though.
Pls advice if something I can try to improve the cluster performance.
Thanks+regards


From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-




Reply | Threaded
Open this post in threaded view
|

Re: Flink Cluster Load Distribution Question

amir bahmanyari
Hi Aljoscha,
Thanks for your kind response.
- We are really benchmarking Beam & its Runners and it happened that we started with Flink.
therefore, any change we make to the approach must be a Beam code change that automatically affects the underlying runner.
- I changed the TextIO() back to KafkaIO() reading from a Kafka cluster instead of a single node. Its behaving fine except that I am getting out of disk space by Kafka broker
& am working around it as we speak.
- I removed Redis as per your recommendation & replaced it with Java Concurrenthashmaps...Started to be a lot faster than before for sure.
I cannot use a FLink specific solution for this. Must be either an external something or a Beam solution or just JVM solution. I picked Concurrenthashmaps for now.
If I get by the Kafka  broker disk space issue, and dont get an out of memory by the flink servers in 3 hrs of runtime, I should be starting seeing the light :)) 
Pls keep your fingers crossed as testing is underway for 10 express ways of linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs.
- Kafka partitions in the kafka topic = total number of slots available in flink servers. Should I alter that for better performance?

Thanks Aljoscha & have a great weekend.
Amir-


From: Aljoscha Krettek <[hidden email]>
To: Amir Bahmanyari <[hidden email]>; user <[hidden email]>
Sent: Sunday, September 18, 2016 1:48 AM
Subject: Re: Flink Cluster Load Distribution Question

This is not related to Flink, but in Beam you can read from a directory containing many files using something like this (from MinimalWordCount.java in Beam):

TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")

This will read all the files in the directory in parallel.

For reading from Kafka I wrote this on another thread of yours:
Are you sure that all your Kafka partitions contain data. Did you have a look at the Kafka metrics to see how the individual partitions are filled? If only one partition contains data, then you will only read data in one parallel instance of the sources. How are you writing your data to Kafka?

Flink/Beam should read from all partitions if all of them contain data. Could you please verify that all Kafka partitions contain data by looking at the metrics of your Kafka cluster, that would be a first step towards finding out where the problem lies.

By the way, your code uses Beam in a highly non-idiomatic way. Interacting with an outside database, such as Redis, will always be the bottleneck in such a job. Flink provides an abstraction for dealing with state that is vastly superior to using an external system. We recently did a blog post about rewriting a similar streaming use case using Flink's internal state: http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe that's interesting for you.

Cheers,
Aljoscha

On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <[hidden email]> wrote:
Thanks so much Aljoscha 
Is there an example that shows how to read from multiple files accurately or from KafkaIO and get perfect parallelism pls?
Have a great weekend

Sent from my iPhone

On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <[hidden email]> wrote:

One observation here is that you're only reading from one file. This will mean that you won't get any parallelism. Everything is executed on just one task/thread.

Cheers,
Aljoscha

On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Experimenting on  relatively smaller file , everything fixed except KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink Cluster when reading tuples by TextIO().
I understand the NW involvement in reading from Kafka topic etc.,  but 50% is significant.
Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same performance & increasing the topic partitions doesnt improve anything.
I thought some of the 64 slots may get multiple-over- parallelism really pushing it to its limit. 64 kafka topic partitions & 400 kafka topic partitions while #slots=64  is the same.

Its still slow for a relatively large file though.
Pls advice if something I can try to improve the cluster performance.
Thanks+regards


From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-






Reply | Threaded
Open this post in threaded view
|

Re: Flink Cluster Load Distribution Question

Aljoscha Krettek
Hi,
good to see that you're making progress! The number of partitions in the Kafka topic should be >= the number of parallel Flink Slots and the parallelism with which you start the program. You also have to make sure to write to all partitions and not just to one.

Cheers,
Aljoscha

On Sun, 18 Sep 2016 at 21:50 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Thanks for your kind response.
- We are really benchmarking Beam & its Runners and it happened that we started with Flink.
therefore, any change we make to the approach must be a Beam code change that automatically affects the underlying runner.
- I changed the TextIO() back to KafkaIO() reading from a Kafka cluster instead of a single node. Its behaving fine except that I am getting out of disk space by Kafka broker
& am working around it as we speak.
- I removed Redis as per your recommendation & replaced it with Java Concurrenthashmaps...Started to be a lot faster than before for sure.
I cannot use a FLink specific solution for this. Must be either an external something or a Beam solution or just JVM solution. I picked Concurrenthashmaps for now.
If I get by the Kafka  broker disk space issue, and dont get an out of memory by the flink servers in 3 hrs of runtime, I should be starting seeing the light :)) 
Pls keep your fingers crossed as testing is underway for 10 express ways of linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs.
- Kafka partitions in the kafka topic = total number of slots available in flink servers. Should I alter that for better performance?

Thanks Aljoscha & have a great weekend.
Amir-


From: Aljoscha Krettek <[hidden email]>
To: Amir Bahmanyari <[hidden email]>; user <[hidden email]>
Sent: Sunday, September 18, 2016 1:48 AM
Subject: Re: Flink Cluster Load Distribution Question

This is not related to Flink, but in Beam you can read from a directory containing many files using something like this (from MinimalWordCount.java in Beam):

TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")

This will read all the files in the directory in parallel.

For reading from Kafka I wrote this on another thread of yours:
Are you sure that all your Kafka partitions contain data. Did you have a look at the Kafka metrics to see how the individual partitions are filled? If only one partition contains data, then you will only read data in one parallel instance of the sources. How are you writing your data to Kafka?

Flink/Beam should read from all partitions if all of them contain data. Could you please verify that all Kafka partitions contain data by looking at the metrics of your Kafka cluster, that would be a first step towards finding out where the problem lies.

By the way, your code uses Beam in a highly non-idiomatic way. Interacting with an outside database, such as Redis, will always be the bottleneck in such a job. Flink provides an abstraction for dealing with state that is vastly superior to using an external system. We recently did a blog post about rewriting a similar streaming use case using Flink's internal state: http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe that's interesting for you.

Cheers,
Aljoscha

On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <[hidden email]> wrote:
Thanks so much Aljoscha 
Is there an example that shows how to read from multiple files accurately or from KafkaIO and get perfect parallelism pls?
Have a great weekend

Sent from my iPhone

On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <[hidden email]> wrote:

One observation here is that you're only reading from one file. This will mean that you won't get any parallelism. Everything is executed on just one task/thread.

Cheers,
Aljoscha

On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Experimenting on  relatively smaller file , everything fixed except KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink Cluster when reading tuples by TextIO().
I understand the NW involvement in reading from Kafka topic etc.,  but 50% is significant.
Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same performance & increasing the topic partitions doesnt improve anything.
I thought some of the 64 slots may get multiple-over- parallelism really pushing it to its limit. 64 kafka topic partitions & 400 kafka topic partitions while #slots=64  is the same.

Its still slow for a relatively large file though.
Pls advice if something I can try to improve the cluster performance.
Thanks+regards


From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-






Reply | Threaded
Open this post in threaded view
|

Re: Flink Cluster Load Distribution Question

amir bahmanyari
Thanx
Could you elaborate on writing to all partitions and not just one pls?
How can I make sure ?
I see all partitions consumed in the dashboard and they get listed when my Beam app starts and KafkaIO read operation gets associated to every single partition 
What else ?
Thanks so much again

Sent from my iPhone

On Sep 18, 2016, at 10:30 PM, Aljoscha Krettek <[hidden email]> wrote:

Hi,
good to see that you're making progress! The number of partitions in the Kafka topic should be >= the number of parallel Flink Slots and the parallelism with which you start the program. You also have to make sure to write to all partitions and not just to one.

Cheers,
Aljoscha

On Sun, 18 Sep 2016 at 21:50 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Thanks for your kind response.
- We are really benchmarking Beam & its Runners and it happened that we started with Flink.
therefore, any change we make to the approach must be a Beam code change that automatically affects the underlying runner.
- I changed the TextIO() back to KafkaIO() reading from a Kafka cluster instead of a single node. Its behaving fine except that I am getting out of disk space by Kafka broker
& am working around it as we speak.
- I removed Redis as per your recommendation & replaced it with Java Concurrenthashmaps...Started to be a lot faster than before for sure.
I cannot use a FLink specific solution for this. Must be either an external something or a Beam solution or just JVM solution. I picked Concurrenthashmaps for now.
If I get by the Kafka  broker disk space issue, and dont get an out of memory by the flink servers in 3 hrs of runtime, I should be starting seeing the light :)) 
Pls keep your fingers crossed as testing is underway for 10 express ways of linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs.
- Kafka partitions in the kafka topic = total number of slots available in flink servers. Should I alter that for better performance?

Thanks Aljoscha & have a great weekend.
Amir-


From: Aljoscha Krettek <[hidden email]>
To: Amir Bahmanyari <[hidden email]>; user <[hidden email]>
Sent: Sunday, September 18, 2016 1:48 AM
Subject: Re: Flink Cluster Load Distribution Question

This is not related to Flink, but in Beam you can read from a directory containing many files using something like this (from MinimalWordCount.java in Beam):

TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")

This will read all the files in the directory in parallel.

For reading from Kafka I wrote this on another thread of yours:
Are you sure that all your Kafka partitions contain data. Did you have a look at the Kafka metrics to see how the individual partitions are filled? If only one partition contains data, then you will only read data in one parallel instance of the sources. How are you writing your data to Kafka?

Flink/Beam should read from all partitions if all of them contain data. Could you please verify that all Kafka partitions contain data by looking at the metrics of your Kafka cluster, that would be a first step towards finding out where the problem lies.

By the way, your code uses Beam in a highly non-idiomatic way. Interacting with an outside database, such as Redis, will always be the bottleneck in such a job. Flink provides an abstraction for dealing with state that is vastly superior to using an external system. We recently did a blog post about rewriting a similar streaming use case using Flink's internal state: http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe that's interesting for you.

Cheers,
Aljoscha

On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <[hidden email]> wrote:
Thanks so much Aljoscha 
Is there an example that shows how to read from multiple files accurately or from KafkaIO and get perfect parallelism pls?
Have a great weekend

Sent from my iPhone

On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <[hidden email]> wrote:

One observation here is that you're only reading from one file. This will mean that you won't get any parallelism. Everything is executed on just one task/thread.

Cheers,
Aljoscha

On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Experimenting on  relatively smaller file , everything fixed except KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink Cluster when reading tuples by TextIO().
I understand the NW involvement in reading from Kafka topic etc.,  but 50% is significant.
Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same performance & increasing the topic partitions doesnt improve anything.
I thought some of the 64 slots may get multiple-over- parallelism really pushing it to its limit. 64 kafka topic partitions & 400 kafka topic partitions while #slots=64  is the same.

Its still slow for a relatively large file though.
Pls advice if something I can try to improve the cluster performance.
Thanks+regards


From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-






Reply | Threaded
Open this post in threaded view
|

Re: Flink Cluster Load Distribution Question

Aljoscha Krettek
How are you writing your data to Kafka? In that code you have to make sure to not write to only one partition of your topic.

On Mon, 19 Sep 2016 at 08:06 Amir Bahmanyari <[hidden email]> wrote:
Thanx
Could you elaborate on writing to all partitions and not just one pls?
How can I make sure ?
I see all partitions consumed in the dashboard and they get listed when my Beam app starts and KafkaIO read operation gets associated to every single partition 
What else ?
Thanks so much again

Sent from my iPhone

On Sep 18, 2016, at 10:30 PM, Aljoscha Krettek <[hidden email]> wrote:

Hi,
good to see that you're making progress! The number of partitions in the Kafka topic should be >= the number of parallel Flink Slots and the parallelism with which you start the program. You also have to make sure to write to all partitions and not just to one.

Cheers,
Aljoscha

On Sun, 18 Sep 2016 at 21:50 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Thanks for your kind response.
- We are really benchmarking Beam & its Runners and it happened that we started with Flink.
therefore, any change we make to the approach must be a Beam code change that automatically affects the underlying runner.
- I changed the TextIO() back to KafkaIO() reading from a Kafka cluster instead of a single node. Its behaving fine except that I am getting out of disk space by Kafka broker
& am working around it as we speak.
- I removed Redis as per your recommendation & replaced it with Java Concurrenthashmaps...Started to be a lot faster than before for sure.
I cannot use a FLink specific solution for this. Must be either an external something or a Beam solution or just JVM solution. I picked Concurrenthashmaps for now.
If I get by the Kafka  broker disk space issue, and dont get an out of memory by the flink servers in 3 hrs of runtime, I should be starting seeing the light :)) 
Pls keep your fingers crossed as testing is underway for 10 express ways of linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs.
- Kafka partitions in the kafka topic = total number of slots available in flink servers. Should I alter that for better performance?

Thanks Aljoscha & have a great weekend.
Amir-


From: Aljoscha Krettek <[hidden email]>
To: Amir Bahmanyari <[hidden email]>; user <[hidden email]>
Sent: Sunday, September 18, 2016 1:48 AM
Subject: Re: Flink Cluster Load Distribution Question

This is not related to Flink, but in Beam you can read from a directory containing many files using something like this (from MinimalWordCount.java in Beam):

TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")

This will read all the files in the directory in parallel.

For reading from Kafka I wrote this on another thread of yours:
Are you sure that all your Kafka partitions contain data. Did you have a look at the Kafka metrics to see how the individual partitions are filled? If only one partition contains data, then you will only read data in one parallel instance of the sources. How are you writing your data to Kafka?

Flink/Beam should read from all partitions if all of them contain data. Could you please verify that all Kafka partitions contain data by looking at the metrics of your Kafka cluster, that would be a first step towards finding out where the problem lies.

By the way, your code uses Beam in a highly non-idiomatic way. Interacting with an outside database, such as Redis, will always be the bottleneck in such a job. Flink provides an abstraction for dealing with state that is vastly superior to using an external system. We recently did a blog post about rewriting a similar streaming use case using Flink's internal state: http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe that's interesting for you.

Cheers,
Aljoscha

On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <[hidden email]> wrote:
Thanks so much Aljoscha 
Is there an example that shows how to read from multiple files accurately or from KafkaIO and get perfect parallelism pls?
Have a great weekend

Sent from my iPhone

On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <[hidden email]> wrote:

One observation here is that you're only reading from one file. This will mean that you won't get any parallelism. Everything is executed on just one task/thread.

Cheers,
Aljoscha

On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <[hidden email]> wrote:
Hi Aljoscha,
Experimenting on  relatively smaller file , everything fixed except KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink Cluster when reading tuples by TextIO().
I understand the NW involvement in reading from Kafka topic etc.,  but 50% is significant.
Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same performance & increasing the topic partitions doesnt improve anything.
I thought some of the 64 slots may get multiple-over- parallelism really pushing it to its limit. 64 kafka topic partitions & 400 kafka topic partitions while #slots=64  is the same.

Its still slow for a relatively large file though.
Pls advice if something I can try to improve the cluster performance.
Thanks+regards


From: Aljoscha Krettek <[hidden email]>
To: [hidden email]; amir bahmanyari <[hidden email]>
Sent: Wednesday, September 14, 2016 1:48 AM
Subject: Re: Fw: Flink Cluster Load Distribution Question

Hi,
this is a different job from the Kafka Job that you have running, right?

Could you maybe post the code for that as well?

Cheers,
Aljoscha

On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <[hidden email]> wrote:
Hi Robert,
Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))
Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.
Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.
I have a Beam running FLink Runner underneath.
The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.
All nodes have identical flink-conf.yaml, masters & slaves contents as follows:

flink-conf.yaml:
        jobmanager.rpc.address: node1
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 102400
taskmanager.numberOfTaskSlots: 16
taskmanager.memory.preallocate: false
parallelism.default: 64
jobmanager.web.port: 8081
taskmanager.network.numberOfBuffers: 4096



masters
node1:8081

slaves:
node1
node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.
JM, TMs log files get generated on all nodes.
Dashboard shows how all slots are being used.
I deploy the Beam app to the cluster where JM is running at node1.
a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.
I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.
As we speak, I started the Beam app 13 hrs ago and its still running.
How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?
Do the above configurations look ok for a reasonable performance?
Given above parameters set, how can I improve the performance in this cluster?
What other information and or dashboard screen shots is needed to clarify this issue. 
I used these websites to do the configuration:






In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box:
  • taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?
Sorry too many questions.
Pls let me know.
I appreciate your help.
Cheers,
Amir-

----- Forwarded Message -----
From: Robert Metzger <[hidden email]>
To: "[hidden email]" <[hidden email]>; amir bahmanyari <[hidden email]>
Sent: Tuesday, September 13, 2016 1:15 AM
Subject: Re: Flink Cluster Load Distribution Question

Hi Amir,

I would recommend to post such questions to the [hidden email] mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
[hidden email]> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its sooooooooo slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-