TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

DONG, Weike
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

DONG, Weike
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

Till Rohrmann
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

Till Rohrmann
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

DONG, Weike
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

DONG, Weike
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned rather quickly (less than 1ms, possibly due to DNS cache on the server), so I thought it might not be the DNS issue. 

However, after debugging and logging, it is found that the lookup time exhibited high variance, i. e. normally it completes fast but occasionally some slow results would block the thread. So an unstable DNS server might have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

Till Rohrmann
Hi Weike,

thanks for getting back to us with your findings. Looking at the `TaskManagerLocation`, we are actually calling `InetAddress.getCanonicalHostName` twice for every creation of a `TaskManagerLocation` instance. This does not look right.

I think it should be fine to make the look up configurable. Moreover, one could think about only doing a lazy look up if the canonical hostname is really needed (as far as I can see it is only really needed input split assignments and for the LocationPreferenceSlotSelectionStrategy to calculate how many TMs run on the same machine).

Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned rather quickly (less than 1ms, possibly due to DNS cache on the server), so I thought it might not be the DNS issue. 

However, after debugging and logging, it is found that the lookup time exhibited high variance, i. e. normally it completes fast but occasionally some slow results would block the thread. So an unstable DNS server might have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

Chesnay Schepler
The InetAddress caches the result of getCanonicalHostName(), so it is not a problem to call it twice.

On 10/15/2020 1:57 PM, Till Rohrmann wrote:
Hi Weike,

thanks for getting back to us with your findings. Looking at the `TaskManagerLocation`, we are actually calling `InetAddress.getCanonicalHostName` twice for every creation of a `TaskManagerLocation` instance. This does not look right.

I think it should be fine to make the look up configurable. Moreover, one could think about only doing a lazy look up if the canonical hostname is really needed (as far as I can see it is only really needed input split assignments and for the LocationPreferenceSlotSelectionStrategy to calculate how many TMs run on the same machine).

Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned rather quickly (less than 1ms, possibly due to DNS cache on the server), so I thought it might not be the DNS issue. 

However, after debugging and logging, it is found that the lookup time exhibited high variance, i. e. normally it completes fast but occasionally some slow results would block the thread. So an unstable DNS server might have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 


Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

Yang Wang
I am afraid the InetAddress cache could not take effect. Because Kubernetes only
creates A and SRV records for Services. It doesn't generate pods' A records
as you may expect. Refer here[1][2] for more information. So the DNS reverse
lookup will always fail. IIRC, the default timeout is 5s. This could explain the delay
of "getHostName" or "getFqdnHostName".

I agree that we should add a config option to disable the DNS reverse lookup.




Best,
Yang

Chesnay Schepler <[hidden email]> 于2020年10月15日周四 下午8:41写道:
The InetAddress caches the result of getCanonicalHostName(), so it is not a problem to call it twice.

On 10/15/2020 1:57 PM, Till Rohrmann wrote:
Hi Weike,

thanks for getting back to us with your findings. Looking at the `TaskManagerLocation`, we are actually calling `InetAddress.getCanonicalHostName` twice for every creation of a `TaskManagerLocation` instance. This does not look right.

I think it should be fine to make the look up configurable. Moreover, one could think about only doing a lazy look up if the canonical hostname is really needed (as far as I can see it is only really needed input split assignments and for the LocationPreferenceSlotSelectionStrategy to calculate how many TMs run on the same machine).

Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned rather quickly (less than 1ms, possibly due to DNS cache on the server), so I thought it might not be the DNS issue. 

However, after debugging and logging, it is found that the lookup time exhibited high variance, i. e. normally it completes fast but occasionally some slow results would block the thread. So an unstable DNS server might have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 


Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

DONG, Weike
In reply to this post by Till Rohrmann
Hi all,

Thanks for all the replies, and I agree with Yang, as we have found that for a pod without a service (like TaskManager pod), the reverse DNS lookup would always fail, so this lookup is not necessary for the Kubernetes environment.

I am glad to help fix this issue to make Flink better : )

Best,
Weike

On Thu, Oct 15, 2020 at 7:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for getting back to us with your findings. Looking at the `TaskManagerLocation`, we are actually calling `InetAddress.getCanonicalHostName` twice for every creation of a `TaskManagerLocation` instance. This does not look right.

I think it should be fine to make the look up configurable. Moreover, one could think about only doing a lazy look up if the canonical hostname is really needed (as far as I can see it is only really needed input split assignments and for the LocationPreferenceSlotSelectionStrategy to calculate how many TMs run on the same machine).

Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned rather quickly (less than 1ms, possibly due to DNS cache on the server), so I thought it might not be the DNS issue. 

However, after debugging and logging, it is found that the lookup time exhibited high variance, i. e. normally it completes fast but occasionally some slow results would block the thread. So an unstable DNS server might have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

Till Rohrmann
Great, thanks a lot Weike. I think the first step would be to open a JIRA issue, get assigned and then start on fixing it and opening a PR.

Cheers,
Till

On Fri, Oct 16, 2020 at 10:02 AM DONG, Weike <[hidden email]> wrote:
Hi all,

Thanks for all the replies, and I agree with Yang, as we have found that for a pod without a service (like TaskManager pod), the reverse DNS lookup would always fail, so this lookup is not necessary for the Kubernetes environment.

I am glad to help fix this issue to make Flink better : )

Best,
Weike

On Thu, Oct 15, 2020 at 7:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for getting back to us with your findings. Looking at the `TaskManagerLocation`, we are actually calling `InetAddress.getCanonicalHostName` twice for every creation of a `TaskManagerLocation` instance. This does not look right.

I think it should be fine to make the look up configurable. Moreover, one could think about only doing a lazy look up if the canonical hostname is really needed (as far as I can see it is only really needed input split assignments and for the LocationPreferenceSlotSelectionStrategy to calculate how many TMs run on the same machine).

Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned rather quickly (less than 1ms, possibly due to DNS cache on the server), so I thought it might not be the DNS issue. 

However, after debugging and logging, it is found that the lookup time exhibited high variance, i. e. normally it completes fast but occasionally some slow results would block the thread. So an unstable DNS server might have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

DONG, Weike
Hi Till,

Thank you for the kind reminder, and I have created a JIRA ticket for this issue https://issues.apache.org/jira/browse/FLINK-19677

Could you please assign it to me? I will try to submit a PR this weekend to fix this : ) 

Sincerely,
Weike

On Fri, Oct 16, 2020 at 5:54 PM Till Rohrmann <[hidden email]> wrote:
Great, thanks a lot Weike. I think the first step would be to open a JIRA issue, get assigned and then start on fixing it and opening a PR.

Cheers,
Till

On Fri, Oct 16, 2020 at 10:02 AM DONG, Weike <[hidden email]> wrote:
Hi all,

Thanks for all the replies, and I agree with Yang, as we have found that for a pod without a service (like TaskManager pod), the reverse DNS lookup would always fail, so this lookup is not necessary for the Kubernetes environment.

I am glad to help fix this issue to make Flink better : )

Best,
Weike

On Thu, Oct 15, 2020 at 7:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for getting back to us with your findings. Looking at the `TaskManagerLocation`, we are actually calling `InetAddress.getCanonicalHostName` twice for every creation of a `TaskManagerLocation` instance. This does not look right.

I think it should be fine to make the look up configurable. Moreover, one could think about only doing a lazy look up if the canonical hostname is really needed (as far as I can see it is only really needed input split assignments and for the LocationPreferenceSlotSelectionStrategy to calculate how many TMs run on the same machine).

Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned rather quickly (less than 1ms, possibly due to DNS cache on the server), so I thought it might not be the DNS issue. 

However, after debugging and logging, it is found that the lookup time exhibited high variance, i. e. normally it completes fast but occasionally some slow results would block the thread. So an unstable DNS server might have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike

 
Reply | Threaded
Open this post in threaded view
|

Re: TaskManager takes abnormally long time to register with JobManager on Kubernetes for Flink 1.11.0

Till Rohrmann
Done, you are assigned now Weike.

Cheers,
Till

On Fri, Oct 16, 2020 at 1:33 PM DONG, Weike <[hidden email]> wrote:
Hi Till,

Thank you for the kind reminder, and I have created a JIRA ticket for this issue https://issues.apache.org/jira/browse/FLINK-19677

Could you please assign it to me? I will try to submit a PR this weekend to fix this : ) 

Sincerely,
Weike

On Fri, Oct 16, 2020 at 5:54 PM Till Rohrmann <[hidden email]> wrote:
Great, thanks a lot Weike. I think the first step would be to open a JIRA issue, get assigned and then start on fixing it and opening a PR.

Cheers,
Till

On Fri, Oct 16, 2020 at 10:02 AM DONG, Weike <[hidden email]> wrote:
Hi all,

Thanks for all the replies, and I agree with Yang, as we have found that for a pod without a service (like TaskManager pod), the reverse DNS lookup would always fail, so this lookup is not necessary for the Kubernetes environment.

I am glad to help fix this issue to make Flink better : )

Best,
Weike

On Thu, Oct 15, 2020 at 7:57 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for getting back to us with your findings. Looking at the `TaskManagerLocation`, we are actually calling `InetAddress.getCanonicalHostName` twice for every creation of a `TaskManagerLocation` instance. This does not look right.

I think it should be fine to make the look up configurable. Moreover, one could think about only doing a lazy look up if the canonical hostname is really needed (as far as I can see it is only really needed input split assignments and for the LocationPreferenceSlotSelectionStrategy to calculate how many TMs run on the same machine).

Do you want to fix this issue?

Cheers,
Till

On Thu, Oct 15, 2020 at 11:38 AM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

By the way, initially I resolved the IPs several times but results returned rather quickly (less than 1ms, possibly due to DNS cache on the server), so I thought it might not be the DNS issue. 

However, after debugging and logging, it is found that the lookup time exhibited high variance, i. e. normally it completes fast but occasionally some slow results would block the thread. So an unstable DNS server might have a great impact on the performance of Flink job startup.

Best,
Weike

On Thu, Oct 15, 2020 at 5:19 PM DONG, Weike <[hidden email]> wrote:
Hi Till and community,

Increasing `kubernetes.jobmanager.cpu` in the configuration makes this issue alleviated but not disappeared.

After adding DEBUG logs to the internals of flink-runtime, we have found the culprit is 
inetAddress.getCanonicalHostName()
in org.apache.flink.runtime.taskmanager.TaskManagerLocation#getHostName and org.apache.flink.runtime.taskmanager.TaskManagerLocation#getFqdnHostName, which could take ~ 6 seconds to complete, thus Akka dispatcher(s) are severely blocked by that.

By commenting out the two methods, this issue seems to be solved immediately, so I wonder if Flink could provide a configuration parameter to turn off the DNS reverse lookup process, as it seems that Flink jobs could run happily without it.

Sincerely,
Weike


On Tue, Oct 13, 2020 at 6:52 PM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

could you try setting kubernetes.jobmanager.cpu: 4 in your flink-conf.yaml? I fear that a single CPU is too low for the JobManager component.

Cheers,
Till

On Tue, Oct 13, 2020 at 11:33 AM Till Rohrmann <[hidden email]> wrote:
Hi Weike,

thanks for posting the logs. I will take a look at them. My suspicion would be that there is some operation blocking the JobMaster's main thread which causes the registrations from the TMs to time out. Maybe the logs allow me to validate/falsify this suspicion.

Cheers,
Till

On Mon, Oct 12, 2020 at 10:43 AM DONG, Weike <[hidden email]> wrote:
Hi community,

I have uploaded the log files of JobManager and TaskManager-1-1 (one of the 50 TaskManagers) with DEBUG log level and default Flink configuration, and it clearly shows that TaskManager failed to register with JobManager after 10 attempts.

Here is the link:

JobManager: 

TaskManager-1-1: 

Thanks : )

Best regards,
Weike


On Mon, Oct 12, 2020 at 4:14 PM DONG, Weike <[hidden email]> wrote:
Hi community,

Recently we have noticed a strange behavior for Flink jobs on Kubernetes per-job mode: when the parallelism increases, the time it takes for the TaskManagers to register with JobManager becomes abnormally long (for a task with parallelism of 50, it could take 60 ~ 120 seconds or even longer for the registration attempt), and usually more than 10 attempts are needed to finish this registration.

Because of this, we could not submit a job requiring more than 20 slots with the default configuration, as the TaskManager would say:
 
Registration at JobManager (akka.tcp://flink@myjob-201076.default:6123/user/rpc/jobmanager_2) attempt 9 timed out after 25600 ms
Free slot with allocation id 60d5277e138a94fb73fc6691557001e0 because: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 
Free slot TaskSlot(index:0, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=1.425gb (1530082070 bytes), taskOffHeapMemory=0 bytes, managedMemory=1.340gb (1438814063 bytes), networkMemory=343.040mb (359703515 bytes)}, allocationId: 60d5277e138a94fb73fc6691557001e0, jobId: 493cd86e389ccc8f2887e1222903b5ce).
java.lang.Exception: The slot 60d5277e138a94fb73fc6691557001e0 has timed out. 

In order to cope with this issue, we have to change the below configuration parameters:

# Prevent "Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. Stopping the JobMaster for job"
slot.request.timeout: 500000 
# Increase max timeout in a single attempt
cluster.registration.max-timeout: 300000
# Prevent "free slot (TaskSlot)"
akka.ask.timeout: 10 min
# Prevent "Heartbeat of TaskManager timed out."
heartbeat.timeout: 500000

However, we acknowledge that this is only a temporary dirty fix, which is not what we want. It could be seen that during TaskManager registration to JobManager, lots of warning messages come out in logs:

No hostname could be resolved for the IP address 9.166.0.118, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.

Initially we thought this was probably the cause (reverse lookup of DNS might take up a long time), however we later found that the reverse lookup only took less than 1ms, so maybe not because of this.

Also, we have checked the GC log of both TaskManagers and JobManager, and they seem to be perfectly normal, without any signs of pauses. And the heartbeats are processed as normal according to the logs.

Moreover, TaskManagers register quickly with ResourceManager, but then extra slow with TaskManager, so this is not because of a slow network connection. 

Here we wonder what could be the cause for the slow registration between JobManager and TaskManager(s)? No other warning or error messages in the log (DEBUG level) other than the "No hostname could be resolved" messages, which is quite weird.

Thanks for the reading, and hope to get some insights into this issues : )

Sincerely,
Weike