Hi,
I'm using Flink-1.12.0 and running on Hadoop YARN. After setting HA-related properties in flink-conf.yaml, high-availability: zookeeper high-availability.zookeeper.path.root: /recovery high-availability.zookeeper.quorum: nm01:2181,nm02:2181,nm03:2181 high-availability.storageDir: hdfs:///flink/recovery the following command hangs and fails: $ flink list --target yarn-per-job -Dyarn.application.id=$application_id Before setting the properties, I can see the following lines after executing the above command: 2021-01-06 00:11:48,961 INFO org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop user set to deploy (auth:SIMPLE) 2021-01-06 00:11:48,968 INFO org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file will be created as /tmp/jaas-8522045433029410483.conf. 2021-01-06 00:11:48,976 INFO org.apache.flink.client.cli.CliFrontend [] - Running 'list' command. 2021-01-06 00:11:49,316 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at nm02/10.93.0.91:10200 2021-01-06 00:11:49,324 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2021-01-06 00:11:49,333 WARN org.apache.flink.yarn.YarnClusterDescriptor [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN. 2021-01-06 00:11:49,404 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface dn03:37098 of application 'application_1600163418174_0127'. 2021-01-06 00:11:49,758 INFO org.apache.flink.client.cli.CliFrontend [] - Waiting for response... Waiting for response... 2021-01-06 00:11:49,863 INFO org.apache.flink.client.cli.CliFrontend [] - Successfully retrieved list of jobs ------------------ Running/Restarting Jobs ------------------- 31.12.2020 01:22:34 : 76fc265c44ef44ae343ab15868155de6 : stream calculator (RUNNING) -------------------------------------------------------------- No scheduled jobs. After: 2021-01-06 00:06:38,971 INFO org.apache.flink.runtime.security.modules.HadoopModule [] - Hadoop user set to deploy (auth:SIMPLE) 2021-01-06 00:06:38,976 INFO org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file will be created as /tmp/jaas-3613274701724362777.conf. 2021-01-06 00:06:38,982 INFO org.apache.flink.client.cli.CliFrontend [] - Running 'list' command. 2021-01-06 00:06:39,304 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at nm02/10.93.0.91:10200 2021-01-06 00:06:39,312 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2021-01-06 00:06:39,320 WARN org.apache.flink.yarn.YarnClusterDescriptor [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN. 2021-01-06 00:06:39,388 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface dn03:37098 of application 'application_1600163418174_0127'. 2021-01-06 00:06:39,399 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Enforcing default ACL for ZK connections 2021-01-06 00:06:39,399 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Using '/recovery/default' as Zookeeper namespace. 2021-01-06 00:06:39,425 INFO org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility [] - Running in ZooKeeper 3.4.x compatibility mode 2021-01-06 00:06:39,425 INFO org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility [] - Using emulated InjectSessionExpiration 2021-01-06 00:06:39,447 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - Starting 2021-01-06 00:06:39,455 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Initiating client connection, connectString=nm01:2181, nm02:2181,nm03:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState@7668d560 2021-01-06 00:06:39,466 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - Default schema 2021-01-06 00:06:39,466 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-3613274701724362777.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. 2021-01-06 00:06:39,467 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Opening socket connection to server nm01/10.93.0.32:2181 2021-01-06 00:06:39,467 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Socket connection established to nm01/10.93.0.32:2181, initiating session 2021-01-06 00:06:39,467 ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - Authentication failed 2021-01-06 00:06:39,477 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Session establishment complete on server nm01/10.93.0.32:2181, sessionid = 0x176d1f2c2280016, negotiated timeout = 60000 2021-01-06 00:06:39,478 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED 2021-01-06 00:06:39,658 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Starting DefaultLeaderRetrievalService with ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}. 2021-01-06 00:06:39,667 INFO org.apache.flink.client.cli.CliFrontend [] - Waiting for response... Waiting for response... # here it took almost 30 seconds 2021-01-06 00:07:09,670 INFO org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - Stopping DefaultLeaderRetrievalService. 2021-01-06 00:07:09,670 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/rest_server_lock'}. 2021-01-06 00:07:09,671 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - backgroundOperationsLoop exiting 2021-01-06 00:07:09,679 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper [] - Session: 0x176d1f2c2280016 closed 2021-01-06 00:07:09,679 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - EventThread shut down for session: 0x176d1f2c2280016 2021-01-06 00:07:09,680 ERROR org.apache.flink.client.cli.CliFrontend [] - Error while running the command. org.apache.flink.util.FlinkException: Failed to retrieve job list. at org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:436) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:418) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:415) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:977) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_222] at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_222] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) [hadoop-common-3.1.1.3.1.4.0-315.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) [flink-dist_2.11-1.12.0.jar:1.12.0] Caused by: java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) ~[flink-dist_2.11-1.12.0.jar:1.12.0] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_222] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_222] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_222] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[?:1.8.0_222] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_222] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_222] at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_222] ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.util.FlinkException: Failed to retrieve job list. at org.apache.flink.client.cli.CliFrontend.listJobs(CliFrontend.java:436) at org.apache.flink.client.cli.CliFrontend.lambda$list$0(CliFrontend.java:418) at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:919) at org.apache.flink.client.cli.CliFrontend.list(CliFrontend.java:415) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:977) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) Caused by: java.util.concurrent.TimeoutException at org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1168) at org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211) at org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:549) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Why is the zookeeper specified for HA used in this process? No way to avoid such behavior? Best, Dongwon |
Hi Dongwon, I think the root cause is that GenericCLI do not override the "high-availability.cluster-id" with specified application id. The GenericCLI is activated by "--target yarn-per-job". In the FlinkYarnSessionCli, we have done this. And the following command should work with/without ZooKeeper HA configured. ./bin/flink list -m yarn-cluster -yid $applicationId You could also specify the "high-availability.cluster-id" so that leader retrieval could get the correct JobManager address. flink list --target yarn-per-job -Dyarn.application.id=$application_id -Dhigh-availability.cluster-id=$application_id BTW, this is not a new introduced behavior change in Flink 1.12. I believe it also could not work in 1.11 and 1.10. Best, Yang Dongwon Kim <[hidden email]> 于2021年1月5日周二 下午11:22写道:
|
Hi Yang, I was wondering why CLI accessed ZK because, as shown in the following lines, CLI seemed to know the address of JM by contacting AHS before connecting to ZK. 2021-01-06 18:35:32,351 INFO org.apache.flink.client.cli.CliFrontend [] - Running 'list' command. 2021-01-06 18:35:32,682 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at mobdata-devflink-nm02.dakao.io/10.93.0.91:10200 2021-01-06 18:35:32,763 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface mobdata-devflink-dn03.dakao.io:37098 of application 'application_1600163418174_0127'. 2021-01-06 18:35:32,773 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Enforcing default ACL for ZK connections 2021-01-06 18:35:32,774 INFO org.apache.flink.runtime.util.ZooKeeperUtils [] - Using '/driving-habits/default' as Zookeeper namespace. Anyway CLI needs to know where the leader (=active) JM is located via a ZK node and GenericCLI has to be informed of high-availability.cluster-id. Thanks for the heads up! You could also specify the "high-availability.cluster-id" so that leader retrieval could get the correct JobManager address. Okay, it checked that it works. Thank you very much :-) It will be nice for other users if your answer is also explained on [1]. And the following I'm very confused as there's different ways to specify YARN per-job clusters: - "--target yarn-per-job" is explained in the current documentation [1] and it looks like the most recent one, so I'd rather use this one with "-Dhigh-availability.cluster-id=$application_id" - Is "--jobmanater yarn-per-job" a preferred way of specifying per-job clusters and is it going to co-exist with "--target yarn-per-job" for the future releases? but It looks old-school to me. - There's also "--executor yarn-per-job" which seems to be deprecated soon (explained in "flink help") Best, Dongwon On Wed, Jan 6, 2021 at 12:33 PM Yang Wang <[hidden email]> wrote:
|
Hi Dongwon, Please find the answer inline. > why CLI accessed ZK? This is a good question. Currently, when the HA is enabled, even though we could get the JobManager rest endpoint from Yarn application report, we still have to retrieve the leader information from ZooKeeper. Please find more information in the class RestClusterClient. I am not aware of any potential issues if we directly retrieve rest endpoint from Yarn application report. And I think this could be a minor improvement. > Specify the "high-availability.cluster-id" to list jobs I have created a ticket for updating the documentation[1]. > About the "-m yarn-cluster" You are right. "--target yarn-per-job" is the recommended way to start a perjob cluster. The backend cli option parser is GenericCLI. It is also used for application mode and K8s deployment. "-m yarn-cluster" is the old way. All the cli options are parsed by FlinkYarnSessionCli. Since it is widely used, it could not be deprecated or removed very soon. "--executor" has the exactly same effect with "--target". The only different is the naming. Best, Yang Dongwon Kim <[hidden email]> 于2021年1月6日周三 下午6:49写道:
|
Thanks Yang for the very detailed explanation! Wow, I really appreciate it. Best, Dongwon On Wed, Jan 6, 2021 at 10:17 PM Yang Wang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |