Resource Planning

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Resource Planning

Thomas Wang

Hi,


I'm trying to see if we have been given enough resources (i.e. CPU and memory) to each task node to perform a deduplication job. Currently, the job is not running very stable. What I have been observing is that after a couple of days run, we will suddenly see backpressure happen on one arbitrary ec2 instance in the cluster and when that happens, we will have to give up the current state and restart the job with an empty state. We can no longer take savepoint as it would timeout after 10 minutes, which is understandable.


Additional Observations

When the backpressure happens, we see an increase in our state read time (we are measuring it using a custom metric) from about 0.1 milliseconds to 40-60 milliseconds on that specific problematic ec2 instance. We tried to reboot that ec2 instance, so that the corresponding tasks would be assigned to a different ec2 instance, but the problem persists.


However, I’m not sure if this read time increase is a symptom or the cause of the problem.


Background about this deduplication job:

We are making sessionization with deduplication on an event stream by a session key that is embedded in the event. The throughput of the input stream is around 50k records per second. The after-aggregation output is around 8k records per second.


We are currently using RocksDb-backend state with SSD support and in the state, we are storing session keys with a TTL of 1 week. Based on the current throughput, this could become really huge. I assume RocksDB would flush to the disc as needed, but please correct me if I am wrong.


Information about the cluster:

I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge). I'm using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job running in the Yarn session.


Questions:

1. Currently, I'm starting the yarn session w/ 7g memory on both the Task Manager and and the Job Manager, so that each Yarn container could get 1 CPU. Is this setting reasonable based on your experience?


Here is the command I used to start the Yarn cluster:

export HADOOP_CLASSPATH=`hadoop classpath` && /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached


2. Is there a scientific way to tell what's the right amount of resources I should give to an arbitrary job? Or is this a try and see kinda process?


3. Right now, I'm suspecting resources caused the job to run unstably, but I'm not quite sure. Any other potential causes here? How should I debug from here if resources are not the issue? Is there a way to detect memory leaks?


Thanks in advance!


Thomas


Reply | Threaded
Open this post in threaded view
|

Re: Resource Planning

Xintong Song
Hi Thomas,

It would be helpful if you can provide the jobmanager/taskmanager logs, and gc logs if possible.

Additionally, you may consider to monitor the cpu/memory related metrics [1], see if there's anything abnormal when the problem is observed.

On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang <[hidden email]> wrote:

Hi,


I'm trying to see if we have been given enough resources (i.e. CPU and memory) to each task node to perform a deduplication job. Currently, the job is not running very stable. What I have been observing is that after a couple of days run, we will suddenly see backpressure happen on one arbitrary ec2 instance in the cluster and when that happens, we will have to give up the current state and restart the job with an empty state. We can no longer take savepoint as it would timeout after 10 minutes, which is understandable.


Additional Observations

When the backpressure happens, we see an increase in our state read time (we are measuring it using a custom metric) from about 0.1 milliseconds to 40-60 milliseconds on that specific problematic ec2 instance. We tried to reboot that ec2 instance, so that the corresponding tasks would be assigned to a different ec2 instance, but the problem persists.


However, I’m not sure if this read time increase is a symptom or the cause of the problem.


Background about this deduplication job:

We are making sessionization with deduplication on an event stream by a session key that is embedded in the event. The throughput of the input stream is around 50k records per second. The after-aggregation output is around 8k records per second.


We are currently using RocksDb-backend state with SSD support and in the state, we are storing session keys with a TTL of 1 week. Based on the current throughput, this could become really huge. I assume RocksDB would flush to the disc as needed, but please correct me if I am wrong.


Information about the cluster:

I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge). I'm using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job running in the Yarn session.


Questions:

1. Currently, I'm starting the yarn session w/ 7g memory on both the Task Manager and and the Job Manager, so that each Yarn container could get 1 CPU. Is this setting reasonable based on your experience?


Here is the command I used to start the Yarn cluster:

export HADOOP_CLASSPATH=`hadoop classpath` && /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached


2. Is there a scientific way to tell what's the right amount of resources I should give to an arbitrary job? Or is this a try and see kinda process?


3. Right now, I'm suspecting resources caused the job to run unstably, but I'm not quite sure. Any other potential causes here? How should I debug from here if resources are not the issue? Is there a way to detect memory leaks?


Thanks in advance!


Thomas


Reply | Threaded
Open this post in threaded view
|

Re: Resource Planning

rmetzger0
Hi Thomas,

My gut feeling is that you can use the available resources more efficiently.

What's the size of a checkpoint for your job (you can see that from the UI)? 

Given that your cluster has has an aggregate of 64 * 12 = 768gb of memory available, you might be able to do everything in memory (I might be off by a few terabytes here, it all depends on your state size ;) )

1. In my experience, it is usually more efficient to have a few large Flink instances than many small ones. Maybe try to run 12 TaskManagers (or 11 to make the JM fit) with 58gb of memory (the JM can stick to the 7gb) and see how Flink behaves.

2. I'd say it's a try and see process, with a few educated guesses. Maybe check out this: https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines to get some inspiration for making some "back of the napkin" calculations on the sizing requirements.

3. Do you have some monitoring of CPU / memory / network usage in place? 
It would be interesting to see what the mentrics look like when everything is ok vs when the job is backpressured.

Best,
Robert 


On Wed, Jun 16, 2021 at 3:56 AM Xintong Song <[hidden email]> wrote:
Hi Thomas,

It would be helpful if you can provide the jobmanager/taskmanager logs, and gc logs if possible.

Additionally, you may consider to monitor the cpu/memory related metrics [1], see if there's anything abnormal when the problem is observed.

On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang <[hidden email]> wrote:

Hi,


I'm trying to see if we have been given enough resources (i.e. CPU and memory) to each task node to perform a deduplication job. Currently, the job is not running very stable. What I have been observing is that after a couple of days run, we will suddenly see backpressure happen on one arbitrary ec2 instance in the cluster and when that happens, we will have to give up the current state and restart the job with an empty state. We can no longer take savepoint as it would timeout after 10 minutes, which is understandable.


Additional Observations

When the backpressure happens, we see an increase in our state read time (we are measuring it using a custom metric) from about 0.1 milliseconds to 40-60 milliseconds on that specific problematic ec2 instance. We tried to reboot that ec2 instance, so that the corresponding tasks would be assigned to a different ec2 instance, but the problem persists.


However, I’m not sure if this read time increase is a symptom or the cause of the problem.


Background about this deduplication job:

We are making sessionization with deduplication on an event stream by a session key that is embedded in the event. The throughput of the input stream is around 50k records per second. The after-aggregation output is around 8k records per second.


We are currently using RocksDb-backend state with SSD support and in the state, we are storing session keys with a TTL of 1 week. Based on the current throughput, this could become really huge. I assume RocksDB would flush to the disc as needed, but please correct me if I am wrong.


Information about the cluster:

I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge). I'm using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job running in the Yarn session.


Questions:

1. Currently, I'm starting the yarn session w/ 7g memory on both the Task Manager and and the Job Manager, so that each Yarn container could get 1 CPU. Is this setting reasonable based on your experience?


Here is the command I used to start the Yarn cluster:

export HADOOP_CLASSPATH=`hadoop classpath` && /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached


2. Is there a scientific way to tell what's the right amount of resources I should give to an arbitrary job? Or is this a try and see kinda process?


3. Right now, I'm suspecting resources caused the job to run unstably, but I'm not quite sure. Any other potential causes here? How should I debug from here if resources are not the issue? Is there a way to detect memory leaks?


Thanks in advance!


Thomas


Reply | Threaded
Open this post in threaded view
|

Re: Resource Planning

Rommel Holmes
Hi, Xintong and Robert

Thanks for the reply.

The checkpoint size for our job is 10-20GB since we are doing incremental checkpointing, if we do a savepoint, it can be as big as 150GB. 

1) We will try to make Flink instance bigger.
2) Thanks for the pointer, we will take a look.

3) We do have CPU and memory monitoring, when it is backpressure, the CPU load increases from 25% to 50% with more spiky shape, but it is not 100%.  As for memory, we monitored (Heap.Committed - Heap.Used) per host, when backpressure happened, the memory on host is still 500MB ish.

What we observed is that when backpressure happened, the read state time slowness happened on one of the hosts, and on different task managers on this host. The read state time (one metrics we create and measure) on that host shoots up, from 0.x ms to 40-60 ms. 

We also observed that when this happens, the running compaction time for RocksDB on that host gets longer, from 1 minutes to over 2 minutes. other hosts are still 1minute ish.

We also observed that when this happens, size of the active and unflushed immutable memtables metrics increased not as fast as before the backpressure.

I can provide more context if you are interested. We are still debugging on this issue.

Rommel





On Wed, Jun 16, 2021 at 4:25 AM Robert Metzger <[hidden email]> wrote:
Hi Thomas,

My gut feeling is that you can use the available resources more efficiently.

What's the size of a checkpoint for your job (you can see that from the UI)? 

Given that your cluster has has an aggregate of 64 * 12 = 768gb of memory available, you might be able to do everything in memory (I might be off by a few terabytes here, it all depends on your state size ;) )

1. In my experience, it is usually more efficient to have a few large Flink instances than many small ones. Maybe try to run 12 TaskManagers (or 11 to make the JM fit) with 58gb of memory (the JM can stick to the 7gb) and see how Flink behaves.

2. I'd say it's a try and see process, with a few educated guesses. Maybe check out this: https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines to get some inspiration for making some "back of the napkin" calculations on the sizing requirements.

3. Do you have some monitoring of CPU / memory / network usage in place? 
It would be interesting to see what the mentrics look like when everything is ok vs when the job is backpressured.

Best,
Robert 


On Wed, Jun 16, 2021 at 3:56 AM Xintong Song <[hidden email]> wrote:
Hi Thomas,

It would be helpful if you can provide the jobmanager/taskmanager logs, and gc logs if possible.

Additionally, you may consider to monitor the cpu/memory related metrics [1], see if there's anything abnormal when the problem is observed.

On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang <[hidden email]> wrote:

Hi,


I'm trying to see if we have been given enough resources (i.e. CPU and memory) to each task node to perform a deduplication job. Currently, the job is not running very stable. What I have been observing is that after a couple of days run, we will suddenly see backpressure happen on one arbitrary ec2 instance in the cluster and when that happens, we will have to give up the current state and restart the job with an empty state. We can no longer take savepoint as it would timeout after 10 minutes, which is understandable.


Additional Observations

When the backpressure happens, we see an increase in our state read time (we are measuring it using a custom metric) from about 0.1 milliseconds to 40-60 milliseconds on that specific problematic ec2 instance. We tried to reboot that ec2 instance, so that the corresponding tasks would be assigned to a different ec2 instance, but the problem persists.


However, I’m not sure if this read time increase is a symptom or the cause of the problem.


Background about this deduplication job:

We are making sessionization with deduplication on an event stream by a session key that is embedded in the event. The throughput of the input stream is around 50k records per second. The after-aggregation output is around 8k records per second.


We are currently using RocksDb-backend state with SSD support and in the state, we are storing session keys with a TTL of 1 week. Based on the current throughput, this could become really huge. I assume RocksDB would flush to the disc as needed, but please correct me if I am wrong.


Information about the cluster:

I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge). I'm using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job running in the Yarn session.


Questions:

1. Currently, I'm starting the yarn session w/ 7g memory on both the Task Manager and and the Job Manager, so that each Yarn container could get 1 CPU. Is this setting reasonable based on your experience?


Here is the command I used to start the Yarn cluster:

export HADOOP_CLASSPATH=`hadoop classpath` && /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached


2. Is there a scientific way to tell what's the right amount of resources I should give to an arbitrary job? Or is this a try and see kinda process?


3. Right now, I'm suspecting resources caused the job to run unstably, but I'm not quite sure. Any other potential causes here? How should I debug from here if resources are not the issue? Is there a way to detect memory leaks?


Thanks in advance!


Thomas




--
     Yours
     Rommel
*************************************
  I  waited patiently for the LORD;
   he turned to me and heard my cry.
 He lifted me out of the slimy pit,
   out of the mud and mire;
he set my feet on a rock
   and gave me a firm place to stand.
*************************************
Reply | Threaded
Open this post in threaded view
|

Re: Resource Planning

rmetzger0
Hi,

since your state (150gb) seems to fit into memory (700gb), I would recommend trying the HashMapStateBackend: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#the-hashmapstatebackend (unless you know that your state size is going to increase a lot soon).
But I guess you'll have a nice performance improvement.

At the moment I have no idea where else to look for the issue you are describing, but it seems that there are a few things for you to try out to optimize the resource allocation.

On Wed, Jun 16, 2021 at 7:23 PM Rommel Holmes <[hidden email]> wrote:
Hi, Xintong and Robert

Thanks for the reply.

The checkpoint size for our job is 10-20GB since we are doing incremental checkpointing, if we do a savepoint, it can be as big as 150GB. 

1) We will try to make Flink instance bigger.
2) Thanks for the pointer, we will take a look.

3) We do have CPU and memory monitoring, when it is backpressure, the CPU load increases from 25% to 50% with more spiky shape, but it is not 100%.  As for memory, we monitored (Heap.Committed - Heap.Used) per host, when backpressure happened, the memory on host is still 500MB ish.

What we observed is that when backpressure happened, the read state time slowness happened on one of the hosts, and on different task managers on this host. The read state time (one metrics we create and measure) on that host shoots up, from 0.x ms to 40-60 ms. 

We also observed that when this happens, the running compaction time for RocksDB on that host gets longer, from 1 minutes to over 2 minutes. other hosts are still 1minute ish.

We also observed that when this happens, size of the active and unflushed immutable memtables metrics increased not as fast as before the backpressure.

I can provide more context if you are interested. We are still debugging on this issue.

Rommel





On Wed, Jun 16, 2021 at 4:25 AM Robert Metzger <[hidden email]> wrote:
Hi Thomas,

My gut feeling is that you can use the available resources more efficiently.

What's the size of a checkpoint for your job (you can see that from the UI)? 

Given that your cluster has has an aggregate of 64 * 12 = 768gb of memory available, you might be able to do everything in memory (I might be off by a few terabytes here, it all depends on your state size ;) )

1. In my experience, it is usually more efficient to have a few large Flink instances than many small ones. Maybe try to run 12 TaskManagers (or 11 to make the JM fit) with 58gb of memory (the JM can stick to the 7gb) and see how Flink behaves.

2. I'd say it's a try and see process, with a few educated guesses. Maybe check out this: https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines to get some inspiration for making some "back of the napkin" calculations on the sizing requirements.

3. Do you have some monitoring of CPU / memory / network usage in place? 
It would be interesting to see what the mentrics look like when everything is ok vs when the job is backpressured.

Best,
Robert 


On Wed, Jun 16, 2021 at 3:56 AM Xintong Song <[hidden email]> wrote:
Hi Thomas,

It would be helpful if you can provide the jobmanager/taskmanager logs, and gc logs if possible.

Additionally, you may consider to monitor the cpu/memory related metrics [1], see if there's anything abnormal when the problem is observed.

On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang <[hidden email]> wrote:

Hi,


I'm trying to see if we have been given enough resources (i.e. CPU and memory) to each task node to perform a deduplication job. Currently, the job is not running very stable. What I have been observing is that after a couple of days run, we will suddenly see backpressure happen on one arbitrary ec2 instance in the cluster and when that happens, we will have to give up the current state and restart the job with an empty state. We can no longer take savepoint as it would timeout after 10 minutes, which is understandable.


Additional Observations

When the backpressure happens, we see an increase in our state read time (we are measuring it using a custom metric) from about 0.1 milliseconds to 40-60 milliseconds on that specific problematic ec2 instance. We tried to reboot that ec2 instance, so that the corresponding tasks would be assigned to a different ec2 instance, but the problem persists.


However, I’m not sure if this read time increase is a symptom or the cause of the problem.


Background about this deduplication job:

We are making sessionization with deduplication on an event stream by a session key that is embedded in the event. The throughput of the input stream is around 50k records per second. The after-aggregation output is around 8k records per second.


We are currently using RocksDb-backend state with SSD support and in the state, we are storing session keys with a TTL of 1 week. Based on the current throughput, this could become really huge. I assume RocksDB would flush to the disc as needed, but please correct me if I am wrong.


Information about the cluster:

I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge). I'm using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job running in the Yarn session.


Questions:

1. Currently, I'm starting the yarn session w/ 7g memory on both the Task Manager and and the Job Manager, so that each Yarn container could get 1 CPU. Is this setting reasonable based on your experience?


Here is the command I used to start the Yarn cluster:

export HADOOP_CLASSPATH=`hadoop classpath` && /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached


2. Is there a scientific way to tell what's the right amount of resources I should give to an arbitrary job? Or is this a try and see kinda process?


3. Right now, I'm suspecting resources caused the job to run unstably, but I'm not quite sure. Any other potential causes here? How should I debug from here if resources are not the issue? Is there a way to detect memory leaks?


Thanks in advance!


Thomas




--
     Yours
     Rommel
*************************************
  I  waited patiently for the LORD;
   he turned to me and heard my cry.
 He lifted me out of the slimy pit,
   out of the mud and mire;
he set my feet on a rock
   and gave me a firm place to stand.
*************************************