CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hailu, Andreas

Hi,

 

We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)

at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)

... 15 more

Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

... 1 more

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)

at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)

at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

 

I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:

 

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

 

This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4

 

I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hailu, Andreas

Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits.

 

// ah

 

From: Hailu, Andreas [Engineering]
Sent: Tuesday, November 19, 2019 6:01 PM
To: '[hidden email]' <[hidden email]>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)

at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)

... 15 more

Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

... 1 more

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)

at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)

at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

 

I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:

 

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

 

This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4

 

I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Zhijiang(wangzhijiang999)
Hi Andreas,

You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases.

Best,
Zhijiang

------------------------------------------------------------------
From:Hailu, Andreas <[hidden email]>
Send Time:2019 Nov. 21 (Thu.) 01:03
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits.

 

// ah

 

From: Hailu, Andreas [Engineering]
Sent: Tuesday, November 19, 2019 6:01 PM
To: '[hidden email]' <[hidden email]>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)

at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)

... 15 more

Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

... 1 more

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)

at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)

at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

 

I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:

 

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

 

This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4

 

I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hailu, Andreas

Hi Zhijiang,

 

I looked into the container logs for the failure, and didn’t see any specific OutOfMemory errors before it was killed. I ran the application using the same config this morning on 1.6.4, and it went through successfully. I took a snapshot of the memory usage from the dashboard and can send it to you if you like for reference.

 

What stands out to me as suspicious is that on 1.9.1, the application is using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its runtime and succeeds. The JVM heap memory itself never exceeds its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes around mapped memory.

 

// ah

 

From: Zhijiang <[hidden email]>
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi Andreas,

 

You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases.

 

Best,

Zhijiang

 

------------------------------------------------------------------

From:Hailu, Andreas <[hidden email]>

Send Time:2019 Nov. 21 (Thu.) 01:03

Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits.

 

// ah

 

From: Hailu, Andreas [Engineering]
Sent: Tuesday, November 19, 2019 6:01 PM
To: '[hidden email]' <[hidden email]>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)

at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)

... 15 more

Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

... 1 more

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)

at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)

at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

 

I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:

 

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

 

This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4

 

I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.

 

Best,

Andreas

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Piotr Nowojski-3
Hi,

I would suspect this:
To be the source of the problems.

There seems to be a hidden configuration option that avoids using memory mapped files:

taskmanager.network.bounded-blocking-subpartition-type: file

Could you test if helps?

Piotrek

On 21 Nov 2019, at 15:22, Hailu, Andreas <[hidden email]> wrote:

Hi Zhijiang,
 
I looked into the container logs for the failure, and didn’t see any specific OutOfMemory errors before it was killed. I ran the application using the same config this morning on 1.6.4, and it went through successfully. I took a snapshot of the memory usage from the dashboard and can send it to you if you like for reference.
 
What stands out to me as suspicious is that on 1.9.1, the application is using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its runtime and succeeds. The JVM heap memory itself never exceeds its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes around mapped memory.
 
// ah
 
From: Zhijiang <[hidden email]> 
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
Hi Andreas,
 
You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases.
 
Best,
Zhijiang
 
------------------------------------------------------------------
From:Hailu, Andreas <[hidden email]>
Send Time:2019 Nov. 21 (Thu.) 01:03
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits.
 
// ah
 
From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: '[hidden email]' <[hidden email]>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
Hi,
 
We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:
 
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)
 
I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:
 
2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.
 
This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4
 
I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.
 
Best,
Andreas
 


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hailu, Andreas

Thanks, Piotr. We’ll rerun our apps today with this and get back to you.

 

// ah

 

From: Piotr Nowojski <[hidden email]> On Behalf Of Piotr Nowojski
Sent: Thursday, November 21, 2019 10:14 AM
To: Hailu, Andreas [Engineering] <[hidden email]>
Cc: Zhijiang <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

I would suspect this:

To be the source of the problems.

 

There seems to be a hidden configuration option that avoids using memory mapped files:

 

taskmanager.network.bounded-blocking-subpartition-type: file

 

Could you test if helps?

 

Piotrek



On 21 Nov 2019, at 15:22, Hailu, Andreas <[hidden email]> wrote:

 

Hi Zhijiang,

 

I looked into the container logs for the failure, and didn’t see any specific OutOfMemory errors before it was killed. I ran the application using the same config this morning on 1.6.4, and it went through successfully. I took a snapshot of the memory usage from the dashboard and can send it to you if you like for reference.

 

What stands out to me as suspicious is that on 1.9.1, the application is using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its runtime and succeeds. The JVM heap memory itself never exceeds its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes around mapped memory.

 

// ah

 

From: Zhijiang <[hidden email]> 
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi Andreas,

 

You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases.

 

Best,

Zhijiang

 

------------------------------------------------------------------

From:Hailu, Andreas <[hidden email]>

Send Time:2019 Nov. 21 (Thu.) 01:03

Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits.

 

// ah

 

From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: '[hidden email]' <[hidden email]>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)

at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)

... 15 more

Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

... 1 more

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)

at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)

at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

 

I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:

 

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

 

This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4

 

I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.

 

Best,

Andreas

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Zhijiang(wangzhijiang999)
The hint of mmap usage below is really helpful to locate this problem. I forgot this biggest change for batch job in release-1.9.
The blocking type option can be set to `file` as Piotr suggested to behave similar as before. I think it can solve your problem. 

------------------------------------------------------------------
From:Hailu, Andreas <[hidden email]>
Send Time:2019 Nov. 21 (Thu.) 23:37
To:Piotr Nowojski <[hidden email]>
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Thanks, Piotr. We’ll rerun our apps today with this and get back to you.

 

// ah

 

From: Piotr Nowojski <[hidden email]> On Behalf Of Piotr Nowojski
Sent: Thursday, November 21, 2019 10:14 AM
To: Hailu, Andreas [Engineering] <[hidden email]>
Cc: Zhijiang <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

I would suspect this:

To be the source of the problems.

 

There seems to be a hidden configuration option that avoids using memory mapped files:

 

taskmanager.network.bounded-blocking-subpartition-type: file

 

Could you test if helps?

 

Piotrek



On 21 Nov 2019, at 15:22, Hailu, Andreas <[hidden email]> wrote:

 

Hi Zhijiang,

 

I looked into the container logs for the failure, and didn’t see any specific OutOfMemory errors before it was killed. I ran the application using the same config this morning on 1.6.4, and it went through successfully. I took a snapshot of the memory usage from the dashboard and can send it to you if you like for reference.

 

What stands out to me as suspicious is that on 1.9.1, the application is using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its runtime and succeeds. The JVM heap memory itself never exceeds its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes around mapped memory.

 

// ah

 

From: Zhijiang <[hidden email]> 
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi Andreas,

 

You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases.

 

Best,

Zhijiang

 

------------------------------------------------------------------

From:Hailu, Andreas <[hidden email]>

Send Time:2019 Nov. 21 (Thu.) 01:03

Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits.

 

// ah

 

From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: '[hidden email]' <[hidden email]>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)

at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)

... 15 more

Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

... 1 more

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)

at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)

at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

 

I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:

 

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

 

This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4

 

I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.

 

Best,

Andreas

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Hailu, Andreas

Zhijiang, Piotr, we made this change and it solved our mmap usage problem, so we can move forward in our testing. Thanks.

 

I’m curious – if I’m understanding this change in 1.9 correctly, blocking result partitions were being written to mmap which in turn resulted in exhausting container memory? This is why we were seeing failures in our pipelines which had operators which fed into a CoGroup?

 

// ah

 

From: Zhijiang <[hidden email]>
Sent: Thursday, November 21, 2019 9:48 PM
To: Hailu, Andreas [Engineering] <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

The hint of mmap usage below is really helpful to locate this problem. I forgot this biggest change for batch job in release-1.9.

The blocking type option can be set to `file` as Piotr suggested to behave similar as before. I think it can solve your problem. 

 

------------------------------------------------------------------

From:Hailu, Andreas <[hidden email]>

Send Time:2019 Nov. 21 (Thu.) 23:37

To:Piotr Nowojski <[hidden email]>

Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Thanks, Piotr. We’ll rerun our apps today with this and get back to you.

 

// ah

 

From: Piotr Nowojski <[hidden email]> On Behalf Of Piotr Nowojski
Sent: Thursday, November 21, 2019 10:14 AM
To: Hailu, Andreas [Engineering] <[hidden email]>
Cc: Zhijiang <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

I would suspect this:

To be the source of the problems.

 

There seems to be a hidden configuration option that avoids using memory mapped files:

 

taskmanager.network.bounded-blocking-subpartition-type: file

 

Could you test if helps?

 

Piotrek

 

On 21 Nov 2019, at 15:22, Hailu, Andreas <[hidden email]> wrote:

 

Hi Zhijiang,

 

I looked into the container logs for the failure, and didn’t see any specific OutOfMemory errors before it was killed. I ran the application using the same config this morning on 1.6.4, and it went through successfully. I took a snapshot of the memory usage from the dashboard and can send it to you if you like for reference.

 

What stands out to me as suspicious is that on 1.9.1, the application is using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its runtime and succeeds. The JVM heap memory itself never exceeds its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes around mapped memory.

 

// ah

 

From: Zhijiang <[hidden email]> 
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi Andreas,

 

You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases.

 

Best,

Zhijiang

 

------------------------------------------------------------------

From:Hailu, Andreas <[hidden email]>

Send Time:2019 Nov. 21 (Thu.) 01:03

Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits.

 

// ah

 

From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: '[hidden email]' <[hidden email]>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

 

Hi,

 

We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:

 

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)

at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)

... 15 more

Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)

at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

... 1 more

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.

at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)

at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)

at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)

at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)

 

I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:

 

2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.

 

This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4

 

I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.

 

Best,

Andreas

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Piotr Nowojski-3
Hi,

Good to hear that you were able to workaround the problem.

I’m not sure what’s the exact reason why mmaped partitions caused those failures, but you are probably right that they have caused some memory exhaustion. Probably this memory is not capped by anything, but I would expect kernel to release it instead of killing the container, unless this was not kernel’s OOM killer that killed the container. Is that what’s happening? Yarn is keeping track of the used memory and Flink’s mmap files unbounded usage of mmap files caused to exceed this limit?

I’ve asked some colleagues to take a look here, but most of them are busy this week with Flink Forward Asia, so they might not respond immediately.

Piotrek

On 22 Nov 2019, at 14:51, Hailu, Andreas <[hidden email]> wrote:

Zhijiang, Piotr, we made this change and it solved our mmap usage problem, so we can move forward in our testing. Thanks.
 
I’m curious – if I’m understanding this change in 1.9 correctly, blocking result partitions were being written to mmap which in turn resulted in exhausting container memory? This is why we were seeing failures in our pipelines which had operators which fed into a CoGroup?
 
// ah
 
From: Zhijiang <[hidden email]> 
Sent: Thursday, November 21, 2019 9:48 PM
To: Hailu, Andreas [Engineering] <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
The hint of mmap usage below is really helpful to locate this problem. I forgot this biggest change for batch job in release-1.9.
The blocking type option can be set to `file` as Piotr suggested to behave similar as before. I think it can solve your problem. 
 
------------------------------------------------------------------
From:Hailu, Andreas <[hidden email]>
Send Time:2019 Nov. 21 (Thu.) 23:37
To:Piotr Nowojski <[hidden email]>
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
Thanks, Piotr. We’ll rerun our apps today with this and get back to you.
 
// ah
 
From: Piotr Nowojski <[hidden email]> On Behalf Of Piotr Nowojski
Sent: Thursday, November 21, 2019 10:14 AM
To: Hailu, Andreas [Engineering] <[hidden email]>
Cc: Zhijiang <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
Hi,
 
I would suspect this:
To be the source of the problems.
 
There seems to be a hidden configuration option that avoids using memory mapped files:
 
taskmanager.network.bounded-blocking-subpartition-type: file
 
Could you test if helps?
 
Piotrek

 

On 21 Nov 2019, at 15:22, Hailu, Andreas <[hidden email]> wrote:
 
Hi Zhijiang,
 
I looked into the container logs for the failure, and didn’t see any specific OutOfMemory errors before it was killed. I ran the application using the same config this morning on 1.6.4, and it went through successfully. I took a snapshot of the memory usage from the dashboard and can send it to you if you like for reference.
 
What stands out to me as suspicious is that on 1.9.1, the application is using nearly 6GB of Mapped memory before it dies, while 1.6.4 uses 0 throughout its runtime and succeeds. The JVM heap memory itself never exceeds its capacity, peaking at 6.65GB, so it sounds like the problem lies somewhere in the changes around mapped memory.
 
// ah
 
From: Zhijiang <[hidden email]> 
Sent: Wednesday, November 20, 2019 11:32 PM
To: Hailu, Andreas [Engineering] <[hidden email]>; [hidden email]
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
Hi Andreas,
 
You are running a batch job, so there should be no native memory used by rocked state backend. Then I guess it is either heap memory or direct memory over used. The heap managed memory is mainly used by batch operators and direct memory is used by network shuffle. Can you further check whether there are any logs to indicate HeapOutOfMemory or DirectOutOfMemory before killed? If the used memory exceeds the JVM configuration, it should throw that error. Then we can further narrow down the scope. I can not remember the changes of memory issues for managed memory or network stack, especially it really spans several releases.
 
Best,
Zhijiang
 
------------------------------------------------------------------
From:Hailu, Andreas <[hidden email]>
Send Time:2019 Nov. 21 (Thu.) 01:03
Subject:RE: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
Going through the release notes today - we tried fiddling with the taskmanager.memory.fraction option, going as low as 0.1 with unfortunately no success. It still leads to the container running beyond physical memory limits.
 
// ah
 
From: Hailu, Andreas [Engineering] 
Sent: Tuesday, November 19, 2019 6:01 PM
To: '[hidden email]' <[hidden email]>
Subject: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?
 
Hi,
 
We’re in the middle of testing the upgrade of our data processing flows from Flink 1.6.4 to 1.9.1. We’re seeing that flows which were running just fine on 1.6.4 now fail on 1.9.1 with the same application resources and input data size. It seems that there have been some changes around how the data is sorted prior to being fed to the CoGroup operator - this is the error that we encounter:
 
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 15 more
Caused by: java.lang.Exception: The data preparation for task 'CoGroup (Dataset | Merge | NONE)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
... 1 more
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Lost connection to task manager 'd73996-213.dc.gs.com/10.47.226.218:46003'. This indicates that the remote task manager was lost.
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1109)
at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:102)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:474)
 
I drilled further down into the YARN app logs, and I found that the container was running out of physical memory:
 
2019-11-19 12:49:23,068 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e42_1574076744505_9444_01_000004 because: Container [pid=42774,containerID=container_e42_1574076744505_9444_01_000004] is running beyond physical memory limits. Current usage: 12.0 GB of 12 GB physical memory used; 13.9 GB of 25.2 GB virtual memory used. Killing container.
 
This is what leads my suspicions as this resourcing configuration worked just fine on 1.6.4
 
I’m working on getting heap dumps of these applications to try and get a better understanding of what’s causing the blowup in physical memory required myself, but it would be helpful if anyone knew what relevant changes have been made between these versions or where else I could look? There are some features in 1.9 that we’d like to use in our flows so getting this sorted out, no pun intended, is inhibiting us from doing so.
 
Best,
Andreas
 


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
 
 


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
 
 


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

yingjie
In reply to this post by Hailu, Andreas
The new BlockingSubpartition implementation in 1.9 uses mmap for data reading
by default which means it steals memory from OS. The mmaped region memory is
managed by JVM, so there should be no OutOfMemory problem reported by JVM
and the OS memory is also not exhausted, so there should be no kernal OOM.
I think Piotr's suspicion is right, yarn tracked the memory used and killed
the TM (the mmap region is also part of the process memory).

Giving a strict resource restriction to the container (larger than the yarn
limit) which can avoid memory steal or using file instead of mmap as pointed
out by Piotr can solve the problem.

I think Flink may need to restrict the amount of memory can be stolen.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Piotr Nowojski-3
Thanks for the confirmation, I’ve created Jira ticket to track this issue [1]


Piotrek

On 26 Nov 2019, at 11:10, yingjie <[hidden email]> wrote:

The new BlockingSubpartition implementation in 1.9 uses mmap for data reading
by default which means it steals memory from OS. The mmaped region memory is
managed by JVM, so there should be no OutOfMemory problem reported by JVM
and the OS memory is also not exhausted, so there should be no kernal OOM.
I think Piotr's suspicion is right, yarn tracked the memory used and killed
the TM (the mmap region is also part of the process memory).

Giving a strict resource restriction to the container (larger than the yarn
limit) which can avoid memory steal or using file instead of mmap as pointed
out by Piotr can solve the problem.

I think Flink may need to restrict the amount of memory can be stolen.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

bupt_ljy
In reply to this post by Hailu, Andreas

Hi,


I’ve met the exactly same problem recently and solved it in Piotr’s way. @zhijiang, I didn’t see any oom error thrown by JVM (I’m not sure this can be thrown if yarn decides to kill it in a mandatory way). According to our monitoring system, the overusage of memory is from JVM directy memory.


The interesting part is that the old way works if I increase the -XX:MaxDirectMemorySize to be around 3 GB (it’s around 2GB before). So I suspect we at least need to reserve one #ByteBuffer’s size in #memoryMappedRegions for #MappedByteBuffer (which is 2 GB for large files). Not sure I’m right about this.


@yingjie   Do you have any idea how much memory will be stolen from OS when using mmap for data reading?



Best,

Jiayi Liao


 Original Message 
Sender: yingjie<[hidden email]>
Recipient: user<[hidden email]>
Date: Tuesday, Nov 26, 2019 18:10
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

The new BlockingSubpartition implementation in 1.9 uses mmap for data reading
by default which means it steals memory from OS. The mmaped region memory is
managed by JVM, so there should be no OutOfMemory problem reported by JVM
and the OS memory is also not exhausted, so there should be no kernal OOM.
I think Piotr's suspicion is right, yarn tracked the memory used and killed
the TM (the mmap region is also part of the process memory).

Giving a strict resource restriction to the container (larger than the yarn
limit) which can avoid memory steal or using file instead of mmap as pointed
out by Piotr can solve the problem.

I think Flink may need to restrict the amount of memory can be stolen.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Piotr Nowojski-3
Hi,

@yingjie   Do you have any idea how much memory will be stolen from OS when using mmap for data reading?

I think this is bounded only by the size of the written data. Also it will not be “stolen from OS”, as kernel is controlling the amount of pages residing currently in the RAM depending on the current memory pressure (the same was as file cache).

Piotrek
  
On 27 Nov 2019, at 04:50, bupt_ljy <[hidden email]> wrote:

Hi,

I’ve met the exactly same problem recently and solved it in Piotr’s way. @zhijiang, I didn’t see any oom error thrown by JVM (I’m not sure this can be thrown if yarn decides to kill it in a mandatory way). According to our monitoring system, the overusage of memory is from JVM directy memory.

The interesting part is that the old way works if I increase the -XX:MaxDirectMemorySize to be around 3 GB (it’s around 2GB before). So I suspect we at least need to reserve one #ByteBuffer’s size in #memoryMappedRegions for #MappedByteBuffer (which is 2 GB for large files). Not sure I’m right about this.

@yingjie   Do you have any idea how much memory will be stolen from OS when using mmap for data reading?


Best,
Jiayi Liao

 Original Message 
Sender: yingjie<[hidden email]>
Recipient: user<[hidden email]>
Date: Tuesday, Nov 26, 2019 18:10
Subject: Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

The new BlockingSubpartition implementation in 1.9 uses mmap for data reading
by default which means it steals memory from OS. The mmaped region memory is
managed by JVM, so there should be no OutOfMemory problem reported by JVM
and the OS memory is also not exhausted, so there should be no kernal OOM.
I think Piotr's suspicion is right, yarn tracked the memory used and killed
the TM (the mmap region is also part of the process memory).

Giving a strict resource restriction to the container (larger than the yarn
limit) which can avoid memory steal or using file instead of mmap as pointed
out by Piotr can solve the problem.

I think Flink may need to restrict the amount of memory can be stolen.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

yingjie
In reply to this post by bupt_ljy
Piotr is right, that depend on the data size you are reading and the memory
pressure. Those memory occupied by mmapped region can be recycled and used
by other processes if memory pressure is high, that is, other process or
service on the same node won't be affected because the OS will recycle the
mmapped pages if needed. But currently, you can't assume a bound of the
memory can be used, because it will use more memory as long as there is free
space and you have more new data to read.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Victor Wong-2
Hi,

We encountered similar issues that the task manager kept being killed by yarn.

- flink 1.9.1
- heap usage is low.

But our job is a **streaming** job, so I want to ask if this issue is only related to **batch** job or not? Thanks!

Best,
Victor


yingjie <[hidden email]> 于2019年11月28日周四 上午11:43写道:
Piotr is right, that depend on the data size you are reading and the memory
pressure. Those memory occupied by mmapped region can be recycled and used
by other processes if memory pressure is high, that is, other process or
service on the same node won't be affected because the OS will recycle the
mmapped pages if needed. But currently, you can't assume a bound of the
memory can be used, because it will use more memory as long as there is free
space and you have more new data to read.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Best,
Victor
Reply | Threaded
Open this post in threaded view
|

Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

Piotr Nowojski-3
Hi,

Yes, it is only related to **batch** jobs, but not necessarily only to DataSet API jobs. If you are using for example Blink SQL/Table API to process some bounded data streams (tables), it could also be visible/affected there. If not, I would suggest to start a new user mailing list question and posting the details (what are you running, job manager/task manager logs, …).

Piotrek

On 2 Dec 2019, at 10:51, Victor Wong <[hidden email]> wrote:

Hi,

We encountered similar issues that the task manager kept being killed by yarn.

- flink 1.9.1
- heap usage is low.

But our job is a **streaming** job, so I want to ask if this issue is only related to **batch** job or not? Thanks!

Best,
Victor


yingjie <[hidden email]> 于2019年11月28日周四 上午11:43写道:
Piotr is right, that depend on the data size you are reading and the memory
pressure. Those memory occupied by mmapped region can be recycled and used
by other processes if memory pressure is high, that is, other process or
service on the same node won't be affected because the OS will recycle the
mmapped pages if needed. But currently, you can't assume a bound of the
memory can be used, because it will use more memory as long as there is free
space and you have more new data to read.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Best,
Victor