This post was updated on .
My project is to automatically generate flink's code jar and then submit it
to yarn cluster for execution and get the ApplicationId. I find that after
execution, an error will be reported
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) at flink.SubmitDemo.submit(SubmitDemo.java:81) at flink.SubmitDemo.main(SubmitDemo.java:60) Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. Diagnostics from YARN: Application application_1526888270443_0073 failed 2 times due to AM Container for appattempt_1526888270443_0073_000002 exited with exitCode: -1000 For more detailed output, check application tracking page:http://worker1:8088/cluster/app/application_1526888270443_0073Then, click on links to logs of each attempt. Diagnostics: File file:/home/demo/.flink/application_1526888270443_0073/application_1526888270443_0073-flink-conf.yaml1424389156105771296.tmp does not exist java.io.FileNotFoundException: File file:/home/demo/.flink/application_1526888270443_0073/application_1526888270443_0073-flink-conf.yaml1424389156105771296.tmp does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421) at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253) at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Failing this attempt. Failing the application. If log aggregation is enabled on your cluster, use this command to further investigate the issue: yarn logs -applicationId application_1526888270443_0073 at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532) at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)Then I searched for the error on Google and found that the reason for the error was that I did not introduce the haoop environment variable. But my jar submission is not called./bin/flink script originally submitted, but use the CliFrontend.java ,How to solve this problem? My code like : public void submit(String[] args) throws Exception { final String configurationDirectory = "/usr/ndp/current/yarn_client/conf"; File configFIle = new File("/home/demo/flink-1.5.1/conf"); final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath()); FlinkYarnSessionCli cli = new FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y", "yarn"); final List<CustomCommandLine<?>> customCommandLines = CliFrontend.loadCustomCommandLines( flinkConfiguration, configurationDirectory); CliFrontend testFrontend = new CliFrontend(flinkConfiguration, customCommandLines); //submit testFrontend.parseParameters(args); CommandLine commandLine = CliFrontendParser.parse( CliFrontendParser.getRunCommandOptions(), args, true); final ApplicationId clusterId = cli.getClusterId(commandLine); System.out.println("ApplicationId=" + clusterId.toString()); } |
I dont think your exception / code was attached. We have an open-sourced project [2] with similar requirement submitting compiled jobs on YARN, where we directly extends Flink's AbstractYarnClusterDescriptor. maybe it can be a reference for you. Thanks, On Tue, Aug 14, 2018 at 7:58 PM spoon_lz <[hidden email]> wrote: My project is to automatically generate flink's code jar and then submit it |
Sorry, I don't know why the code and error are not visible.
The error is : The program finished with the following exception: /org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025) at flink.SubmitDemo.submit(SubmitDemo.java:75) at flink.SubmitDemo.main(SubmitDemo.java:50) Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. Diagnostics from YARN: Application application_1526888270443_0090 failed 2 times due to AM Container for appattempt_1526888270443_0090_000002 exited with exitCode: -1000 For more detailed output, check application tracking page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then, click on links to logs of each attempt. Diagnostics: File file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp does not exist java.io.FileNotFoundException: File file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421) at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253) at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Failing this attempt. Failing the application. If log aggregation is enabled on your cluster, use this command to further investigate the issue: yarn logs -applicationId application_1526888270443_0090 at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532) at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75) ... 5 more/ and my code like : /public class SubmitDemo { private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf"; private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf"; private static final String JAR_FILE = "/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar"; public static void main(String[] args) { SubmitDemo demo = new SubmitDemo(); demo.before(); List<String> parameters = new ArrayList<>(); parameters.add("run"); parameters.add("-d"); parameters.add("-m"); parameters.add("yarn-cluster"); parameters.add("-ynm"); parameters.add("lz_test_alone"); parameters.add("-yn"); parameters.add("4"); parameters.add("-ytm"); parameters.add("4096"); parameters.add("-yjm"); parameters.add("1024"); parameters.add("-c"); parameters.add("flink.Demo"); parameters.add(JAR_FILE); try { demo.submit(parameters.toArray(new String[parameters.size()])); } catch (Exception e) { e.printStackTrace(); } } public void submit(String[] args) throws Exception { final String configurationDirectory = ENV_CONF; File configFIle = new File(FLINK_CONF); final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath()); FlinkYarnSessionCli cli = new FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y", "yarn"); final List<CustomCommandLine<?>> customCommandLines = CliFrontend.loadCustomCommandLines( flinkConfiguration, configurationDirectory); CliFrontend testFrontend = new CliFrontend(flinkConfiguration, customCommandLines); //submit testFrontend.parseParameters(args); CommandLine commandLine = CliFrontendParser.parse( CliFrontendParser.getRunCommandOptions(), args, true); final ApplicationId clusterId = cli.getClusterId(commandLine); System.out.println("ApplicationId=" + clusterId.toString()); } // SET HADOOP ENV private void before() { Map<String, String> newenv = Maps.newHashMap(); newenv.put("HADOOP_CONF_DIR", ENV_CONF); newenv.put("YARN_CONF_DIR", ENV_CONF); try { Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); theEnvironmentField.setAccessible(true); Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null); env.putAll(newenv); Field theCaseInsensitiveEnvironmentField = processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); theCaseInsensitiveEnvironmentField.setAccessible(true); Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null); cienv.putAll(newenv); } catch (NoSuchFieldException e) { Class[] classes = Collections.class.getDeclaredClasses(); Map<String, String> env = System.getenv(); for (Class cl : classes) { if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { Field field = cl.getDeclaredField("m"); field.setAccessible(true); Object obj = field.get(env); Map<String, String> map = (Map<String, String>) obj; map.clear(); map.putAll(newenv); } } } } }/ the error is file not found "/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp " but I can foud this file . Previously, I thought it was an environment variable problem and added " before() ". This method still reported an error -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi,
Is this path accessible on the container? If not, use some distributed file system, nfs or -yt —yarnship option of the cli. Please also take a look at https://lists.apache.org/thread.html/%3CCAF=1nJ8GONoqux7czxpUxAf7L3p=-E_ePSTHk0uWa=GRyG=2nw@...%3E Piotrek
|
Free forum by Nabble | Edit this page |