Hello everyone,
I am testing High Availability of Flink on YARN on an AWS EMR cluster. My configuration is an EMR with one master-node and 3 core-nodes (each with 16 vCores). Zookeeper is running on all nodes. Yarn session was created with: flink-yarn-session -n 2 -s 8 -jm 1024m -tm 20g A job with parallelism of 16 was submitted. I tried to execute the test by terminating the core-node (using Linux "init 0") having the job-manager running on. The first few restarts worked well - a new job-manager was elected, and the job was resumed properly. However, after some restarts, the new job-manager could not retrieve its needed resource any more (only one TM on the node with IP .81 was shown in the Task Managers GUI). <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Flink.png> I kept getting the error message "org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate all requires slots within timeout of 300000 ms. Slots required: 108, slots allocated: 60". Here below is what shown in YARN Resource Manager. <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Yarn.png> As per that screenshot, it looks like there are 2 tasks manager still running (one on each host .88 and .81), which means the one on .88 has not been cleaned properly. If it is, then how to clean it? I wonder whether when the server with JobManager crashes, the whole job is restarted, or a new JobManager will try to connect to the running TMs to resume the job? Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, What Flink version are you using? Can you attach the full logs from JM and TMs? Since Flink 1.5, the -n parameter (number of taskmanagers) should be omitted unless you are in legacy mode [1]. > As per that screenshot, it looks like there are 2 tasks manager still > running (one on each host .88 and .81), which means the one on .88 has not > been cleaned properly. If it is, then how to clean it? The TMs should terminate if they cannot register at the JM [2]. > I wonder whether when the server with JobManager crashes, the whole job is > restarted, or a new JobManager will try to connect to the running TMs to > resume the job? The whole job is restarted but any existing TM containers are reused. Best, Gary [1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#legacy [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#taskmanager-registration-timeout On Wed, Jan 23, 2019 at 7:19 AM Averell <[hidden email]> wrote: Hello everyone, |
Hi Gary,
Thanks for your support. I use flink 1.7.0. I will try to test without that -n. Here below are the JM log (on server .82) and TM log (on server .88). I'm sorry that I missed that TM log before asking, had a thought that it would not relevant. I just fixed the issue with connection to zookeeper and the problem was solved. Then I have another question: when JM cannot start/connect to the JM on .88, why didn't it try on .82 where resource are still available? Thanks and regards, Averell Here is the JM log (from /mnt/var/log/hadoop-yarn/.../jobmanager.log on .82) (it seems irrelevant. Even the earlier message regarding NoResourceAvailable was there in GUI, but not found in the jobmanager.log file): 2019-01-23 04:15:01.869 [main] WARN org.apache.flink.configuration.Configuration - Config uses deprecated configuration key 'web.port' instead of proper key 'rest.port' 2019-01-23 04:15:03.483 [main] WARN org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint - Upload directory /tmp/flink-web-08279f45-0244-4c5c-bc9b-299ac59b4068/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available. And here is the TM log: 2019-01-23 11:07:07.479 [main] ERROR o.a.flink.shaded.curator.org.apache.curator.ConnectionState - Connection timed out for connection string (localhost:2181) and timeout (15000) / elapsed (56538) org.apache.flink.shaded.curator.org.apache.curator.CuratorConnectionLossException: KeeperErrorCode = ConnectionLoss at org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225) at org.apache.flink.shaded.curator.org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94) at org.apache.flink.shaded.curator.org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl$1.call(NamespaceImpl.java:90) at org.apache.flink.shaded.curator.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:109) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.NamespaceImpl.fixForNamespace(NamespaceImpl.java:83) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.fixForNamespace(CuratorFrameworkImpl.java:594) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:158) at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:32) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.reset(NodeCache.java:242) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:175) at org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache.start(NodeCache.java:154) at org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService.start(ZooKeeperLeaderRetrievalService.java:107) at org.apache.flink.runtime.taskexecutor.TaskExecutor.start(TaskExecutor.java:277) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:168) at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:332) at org.apache.flink.yarn.YarnTaskExecutorRunner.lambda$run$0(YarnTaskExecutorRunner.java:142) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.yarn.YarnTaskExecutorRunner.run(YarnTaskExecutorRunner.java:141) at org.apache.flink.yarn.YarnTaskExecutorRunner.main(YarnTaskExecutorRunner.java:75) 2019-01-23 11:07:08.224 [main-SendThread(localhost:2181)] WARN o.a.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, > Then I have another question: when JM cannot start/connect to the JM on .88, > why didn't it try on .82 where resource are still available? When you are deploying on YARN, the TM container placement is decided by the YARN scheduler and not by Flink. Without seeing the complete logs, it is difficult to tell what happened. If you need help with debugging, please enable YARN's log aggregation and attach the output of: yarn logs -applicationId <APP_ID> Do I understand it correctly that your problem was solved by changing the zookeper connection string? Best, Gary On Wed, Jan 23, 2019 at 12:44 PM Averell <[hidden email]> wrote: Hi Gary, |
Hi Gary,
Yes, my problem mentioned in the original post had been resolved by correcting the zookeeper connection string. I have two other relevant questions, if you have time, please help: 1. Regarding JM high availability, when I shut down the host having JM running, YARN would detect that missing JM and start a new one after 10 minutes, and the Flink job would be restored. However, on the console screen that I submitted the job, I got the following error messages: "/The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException/" (full stack trace in the attached file flink_console_timeout.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/flink_console_timeout.log> ) Is there any way to avoid this? As if I run this as an AWS EMR job, the job would be considered failed, while it is actually be restored automatically by YARN after 10 minutes). 2. Regarding logging, could you please help explain about the source of the error messages show in "Exception" tab on Flink Job GUI (as per the screenshot below). I could not find any log files has that message (not in jobmanager.log or in taskmanager.log in EMR's hadoop-yarn logs folder). <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/Screen_Shot_2019-01-25_at_22.png> Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, > Is there any way to avoid this? As if I run this as an AWS EMR job, the job > would be considered failed, while it is actually be restored automatically by > YARN after 10 minutes). You are writing that it takes YARN 10 minutes to restart the application master (AM). However, in my experiments the AM container is restarted within a few seconds when after killing the process. If in your setup YARN actually needs 10 minutes to restart the AM, then you could try increasing the number of retry attempts by the client [2]. > Regarding logging, could you please help explain about the source of the > error messages show in "Exception" tab on Flink Job GUI (as per the > screenshot below). The REST API that is queried by the Web UI returns the root cause from the ExecutionGraph [3]. All job status transitions should be logged together with the exception that caused the transition [4]. Check for INFO level log messages that start with "Job [...] switched from state" followed by a stacktrace. If you cannot find the exception, the problem might be rooted in your log4j or logback configuration. Best, Gary [1] https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java#L767 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#rest-retry-max-attempts [3] https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L87 [4] https://github.com/apache/flink/blob/81acd0a490f3ac40cbb2736189796138ac109dd0/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java#L1363 On Fri, Jan 25, 2019 at 12:42 PM Averell <[hidden email]> wrote: Hi Gary, |
Hi Gary,
Thanks for the help. Gary Yao-3 wrote > You are writing that it takes YARN 10 minutes to restart the application > master (AM). However, in my experiments the AM container is restarted > within a > few seconds when after killing the process. If in your setup YARN actually > needs 10 minutes to restart the AM, then you could try increasing the > number > of retry attempts by the client [2]. I think that comes from the difference in how we tested. When I tried to kill the JM process (using kill -9 pid) then a new process got created within some seconds. However, when I tried to test by crashing the server (using init 0), then it needed some time. I found the yarn-site parameter for that timer: yarn.am.liveness-monitor.expiry-interval-ms, which is default to 10 minutes [1] I increased the rest client configuration as you suggested, and it did help. Gary Yao-3 wrote > The REST API that is queried by the Web UI returns the root cause from the > ExecutionGraph [3]. All job status transitions should be logged together > with > the exception that caused the transition [4]. Check for INFO level log > messages that start with "Job [...] switched from state" followed by a > stacktrace. If you cannot find the exception, the problem might be rooted > in > your log4j or logback configuration. Thanks. I got the point. I am using logback. Tried to configure rolling logs, but not yet success yet. Will need to try more. Thanks and regards, Averell [1] https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml#yarn.am.liveness-monitor.expiry-interval-ms <https://hadoop.apache.org/docs/r2.4.1/hadoop-yarn/hadoop-yarn-common/yarn-default.xml#yarn.am.liveness-monitor.expiry-interval-ms> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Gary,
I faced a similar problem yesterday, but don't know what was the cause yet. The situation that I observed is as follow: - At about 2:57, one of my EMR execution node (IP ...99) got disconnected from YARN resource manager (on RM I could not see that node anymore), despite that the node was still running. <<< This is another issue, but I believe it is with YARN. - About 8 hours after that (between 10:00 - 11:00), I turned the problematic EMR core node off. AWS spun up another node and added it to the cluster to replace that. YARN RM soon recognized the new node and added it to its list of available nodes. However, the JM seemed to not (able to) do anything after that. It kept trying to start the job, failed after the timeout and that "no resource available" exception again and again. No jobmanager logs recorded since 2:57:15 though. I am attaching the logs collected via "yarn logs --applicationId <appId> here. But it seems I still missed something. I am using Flink 1.7.1, with yarn-site configuration yarn.resourcemanager.am.max-attempts=5. Flink configurations are all of the default values. Thanks and best regards, Averell flink.log <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/flink.log> -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, That log file does not look complete. I do not see any INFO level log messages such as [1]. Best, Gary [1] https://github.com/apache/flink/blob/46326ab9181acec53d1e9e7ec8f4a26c672fec31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java#L544 On Fri, Feb 1, 2019 at 12:18 AM Averell <[hidden email]> wrote: Hi Gary, |
Hi Gary,
I am trying to reproduce that problem. BTW, is that possible to change log level (I'm using logback) for a running job? Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, Logback has this feature [1] but is not enabled out of the box. You will have to enable the JMX agent by setting the com.sun.management.jmxremote system property [2][3]. I have not tried this out, though. Best, Gary [1] https://logback.qos.ch/manual/jmxConfig.html [2] https://docs.oracle.com/javase/8/docs/technotes/guides/management/agent.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html#env-java-opts On Thu, Feb 7, 2019 at 11:51 AM Averell <[hidden email]> wrote: Hi Gary, |
Hi Gary,
Thanks for the suggestion. How about changing the configuration of the Flink job itself during runtime? What I have to do now is to take a savepoint, stop the job, change the configuration, and then restore the job from the save point. Is there any easier way to do that? Thanks and regards, Averell -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Averell, The TM containers fetch the Flink binaries and config files form HDFS (or another DFS if configured) [1]. I think you should be able to change the log level by patching the logback configuration in HDFS, and kill all Flink containers on all hosts. If you are running an HA setup, your cluster should be running with the new logback configuration afterwards. Best, Gary [1] https://github.com/apache/flink/blob/02ff4bfe90d8e8b896c9f1a1bdbe8d43a48f5de7/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L691 On Wed, Feb 13, 2019 at 12:44 PM Averell <[hidden email]> wrote: Hi Gary, |
Hi Gary,
Thanks for the answer. I missed your most recent answer in this thread too. However, my last question Averell wrote > How about changing the configuration of the Flink job itself during > runtime? > What I have to do now is to take a savepoint, stop the job, change the > configuration, and then restore the job from the save point. was about changing job configuration (like parallelism, checkpoint locations, checkpoint period,...), not about logback. Thanks and regards, Averel -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |