Hi All, We have hit some load related issues and was wondering if any one has some suggestions. We are noticing task managers and job managers being detached from each other under load and never really sync up again. As a result, Flink session shows 0 slots available for processing. Even though, apps are configured to restart it isn't really helping as there are no slots available to run the apps. Here are excerpt from logs that seemed relevant. (I am trimming out rest of the logs for brevity) Job Manager: 2018-01-19 12:38:00,423 INFO org.apache.flink.runtime.jobmanager.JobManager - Starting JobManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) 2018-01-19 12:38:00,792 INFO org.apache.flink.runtime.jobmanager.JobManager - Maximum heap size: 16384 MiBytes 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobmanager.JobManager - Hadoop version: 2.6.5 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobmanager.JobManager - JVM Options: 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xms16384m 2018-01-19 12:38:00,794 INFO org.apache.flink.runtime.jobmanager.JobManager - -Xmx16384m 2018-01-19 12:38:00,795 INFO org.apache.flink.runtime.jobmanager.JobManager - -XX:+UseG1GC 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2018-01-19 12:38:00,908 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.mb, 16384 2018-01-19 12:53:34,671 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@<jm-host>:37840] 2018-01-19 12:53:34,676 INFO org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@<jm-host>:37840/user/taskmanager terminated. -- So once Flink session boots up, we are hitting it with pretty heavy load, which typically results in the WARN above Task Manager: 2018-01-19 12:38:01,002 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager (Version: 1.4.0, Rev:3a9d9f2, Date:06.12.2017 @ 11:08:40 UTC) 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - Hadoop version: 2.6.5 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - JVM Options: 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xms16384M 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - -Xmx16384M 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:MaxDirectMemorySize=8388607T 2018-01-19 12:38:01,367 INFO org.apache.flink.runtime.taskmanager.TaskManager - -XX:+UseG1GC 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2018-01-19 12:38:01,392 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.mb, 16384 2018-01-19 12:54:48,626 WARN akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@<jm-host>:6123] 2018-01-19 12:54:48,690 INFO akka.remote.Remoting - Quarantined address [akka.tcp://flink@<jm-host>:6123] is still unreachable or has not been restarted. Keeping it quarantined. 018-01-19 12:54:48,774 WARN akka.remote.Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@<tm-host>:6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has a UID that has been quarantined. Association aborted.] 2018-01-19 12:54:48,833 WARN akka.remote.Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@<tm-host>:6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] <bunch of ERRORs on operations not shutdown properly - assuming because JM is unreachable> 2018-01-19 12:56:51,244 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp://flink@<jm-host>:6123/user/jobmanager (attempt 10, timeout: 30000 milliseconds) 2018-01-19 12:56:51,253 WARN akka.remote.Remoting - Tried to associate with unreachable remote address [akka.tcp://flink@<jm-host>:6123]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: [The remote system has quarantined this system. No further associations to the remote system are possible until this system is restarted.] Thanks, Ashish |
Thanks for this message. We also experience very similar issue under a heavy load. In job manager logs we see AskTimeoutExceptions. This correlates typicaly with almost 100% cpu in tak manager. Even if the job is stopped task manger is still busy for minutes or even hour acting like in `saturation` mode. We run two task managers and while is one running 100% the other is running 20% cpu, which might be the cause of overloading one task manager. Pawel 19 sty 2018 18:28 "ashish pok" <[hidden email]> napisał(a):
|
In reply to this post by Ashish Pokharel
Hi, You should enable and check your garbage collection log. We've encountered case where Task Manager disassociated due to
long GC pause.
Regards, Kien On 1/20/2018 1:27 AM, ashish pok wrote:
|
Hi.
Did you find a reason for the detaching ? I sometimes see the same on our system running Flink 1.4 on dc/os. I have enabled taskmanager.Debug.memory.startlogthread for debugging. Med venlig hilsen / Best regards Lasse Nedergaard
|
I haven’t gotten much further with this. It doesn’t look like GC related - at least GC counters were not that atrocious. However, my main concern was once the load subsides why aren’t TM and JM connecting again? That doesn’t look normal. I could definitely tell JM was listening on the port and from logs it does appear TM is trying to message JM that is still alive.
Thanks, Ashish
|
I've seen a similar issue while running successive Flink SQL batches on 1.4. In my case, the Job Manager would fail with the log output about unreachability (with an additional statement about something going "horribly wrong"). Under workload pressure, I reverted to 1.3.2 where everything works perfectly, but we will try again soon on 1.4. When we do I will post the actual log output. This was on YARN in AWS, with akka.ask.timeout = 60s. On Wed, Jan 24, 2018 at 9:57 PM, Ashish Pokharel <[hidden email]> wrote:
|
Hi, this sounds like a serious regression wrt Flink 1.3.2 and we should definitely find out what's causing this problem. Given from what I see in the logs, the following happens: For some time the JobManager seems to no longer receive heartbeats from the TaskManager. This could be, for example, due to long GC pauses or heavy load which starves the ActorSystem's threads which are responsible for sending the heartbeats. Due to this, the TM's ActorSystem is quarantined which effectively renders them useless because the JM will henceforth ignore all messages from these systems. The only way to resolve this problem is to restart the ActorSystem. By setting taskmanager.exit-on-fatal-akka-error to true in flink-conf.yaml, a quarantined TM will shut down. If you run the Flink cluster on Yarn, then a new substitute TM will be started if you have still some container restarts left. That way, the system should be able to recover. Additionally you could try to play around with akka.watch.heartbeat.interval and akka.watch.heartbeat.pause which control the heartbeat interval and the acceptable pause. By increasing the latter, the system should tolerate longer GC pauses and period of high load. However, this only addresses the symptoms of the problem and I'd like to find out what's causing the problem. In order to further debug the problem, it would be really helpful to obtain the logs of the JobManager and the TaskManagers on DEBUG log level and with taskmanager.debug.memory.startLogThread set to true. Additionally it would be interesting to see whats happening on the TaskManagers when you observe high load. So obtaining a profiler dump via VisualVM would be great. And last but not least, it also helps to learn more about the job you're running. What kind of connectors is it using? Are you using Flink's metric system? How is the Flink cluster deployed? Which other libraries are you using in your job? Thanks a lot for your help! Cheers, Till On Tue, Jan 30, 2018 at 8:59 PM, Cliff Resnick <[hidden email]> wrote:
|
Hi Till,
Thanks for detailed response. I will try to gather some of this information during the week and follow up. — Ashish
|
In reply to this post by Till Rohrmann
@Jelmer, this is Till's las response on the issue.
-- Ashish
|
Till, We ran into the same issue. It started with high GC pause that caused jobmanager to lose zk conn and leadership and caused jobmanager to quarantine taskmanager in akka. Once quarantined, akka association btw jobmanager and taskmanager is locked forever. Your suggestion of " taskmanager.exit-on- Thanks, Steven On Sat, Feb 24, 2018 at 7:02 AM, ashish pok <[hidden email]> wrote: @Jelmer, this is Till's las response on the issue. |
Hi Steven, the reason why we did not turn on this feature per default was that in case of a true JM failure, all of the TMs will think that they got quarantined which triggers their shut down. Depending on how many container restarts you have left on Yarn, for example, this can lead to a situation where Flink is not able to recover the job even though it needed to only restart the JM container. Cheers, Till On Wed, Apr 25, 2018 at 10:39 PM, Steven Wu <[hidden email]> wrote:
|
Till, thanks for the clarification. yes, that situation is undesirable either. In our case, restarting jobmanager could also recover the job from akk association lock-out. it was actually the issue (high GC pause) on jobmanager side that caused the akka failure. do we have sth like "jobmanager.exit-on-fatal- Thanks, Steven On Sun, May 13, 2018 at 1:12 PM, Till Rohrmann <[hidden email]> wrote:
|
Hi Steven, we don't have `jobmanager.exit-on-fatal-akka-error` because then the JM would also be killed if a single TM gets quarantined. This is also not a desired behaviour. With Flink 1.5 the problem with quarantining should be gone since we don't rely anymore on Akka's death watch and instead use our own heartbeats. Cheers, Till On Mon, May 14, 2018 at 1:07 AM, Steven Wu <[hidden email]> wrote:
|
Till, thanks for the follow-up. looking forward to 1.5 :) On Fri, May 25, 2018 at 2:11 AM, Till Rohrmann <[hidden email]> wrote:
|
A clarification.. In 1.5 with custom heartbeats are there additional configurations we should be concerned about ? On Fri, May 25, 2018 at 10:17 AM, Steven Wu <[hidden email]> wrote:
|
Hi Vishal, you should not need to configure anything else. Cheers, Till On Sat, Jun 30, 2018 at 7:23 PM Vishal Santoshi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |