How to submit flink job on yarn by java code

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to submit flink job on yarn by java code

spoon_lz
This post was updated on .
My project is to automatically generate flink's code jar and then submit it to yarn cluster for execution and get the ApplicationId. I find that after execution, an error will be reported
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
    at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
    at flink.SubmitDemo.submit(SubmitDemo.java:81)
    at flink.SubmitDemo.main(SubmitDemo.java:60)
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1526888270443_0073 failed 2 times due to AM Container for appattempt_1526888270443_0073_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://worker1:8088/cluster/app/application_1526888270443_0073Then, click on links to logs of each attempt.
Diagnostics: File file:/home/demo/.flink/application_1526888270443_0073/application_1526888270443_0073-flink-conf.yaml1424389156105771296.tmp does not exist
java.io.FileNotFoundException: File file:/home/demo/.flink/application_1526888270443_0073/application_1526888270443_0073-flink-conf.yaml1424389156105771296.tmp does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
    at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
    at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
    at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
    at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
    at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1526888270443_0073
    at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
    at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
    at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
Then I searched for the error on Google and found that the reason for the error was that I did not introduce the haoop environment variable. But my jar submission is not called./bin/flink script originally submitted, but use the CliFrontend.java ,How to solve this problem? My code like :
public void submit(String[] args) throws Exception {

        final String configurationDirectory = "/usr/ndp/current/yarn_client/conf";

        File configFIle = new File("/home/demo/flink-1.5.1/conf");

        final Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());

        FlinkYarnSessionCli cli = new FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y", "yarn");

        final List<CustomCommandLine<?>> customCommandLines = CliFrontend.loadCustomCommandLines(
                flinkConfiguration,
                configurationDirectory);

        CliFrontend testFrontend = new CliFrontend(flinkConfiguration, customCommandLines);
        //submit
        testFrontend.parseParameters(args);
        CommandLine commandLine = CliFrontendParser.parse(
                CliFrontendParser.getRunCommandOptions(),
                args,
                true);
        final ApplicationId clusterId = cli.getClusterId(commandLine);
        System.out.println("ApplicationId=" + clusterId.toString());

}
Reply | Threaded
Open this post in threaded view
|

Re: How to submit flink job on yarn by java code

Rong Rong
I dont think your exception / code was attached.

In general, this is largely depending on how your setup is. Are you trying to setup a long-running YARN session cluster or are you trying to directly use YARN cluster submit? [1].
We have an open-sourced project [2] with similar requirement submitting compiled jobs on YARN, where we directly extends Flink's AbstractYarnClusterDescriptor.
maybe it can be a reference for you.

Thanks,

On Tue, Aug 14, 2018 at 7:58 PM spoon_lz <[hidden email]> wrote:
My project is to automatically generate flink's code jar and then submit it
to yarn cluster for execution and get the ApplicationId. I find that after
execution, an error will be reported



Then I searched for the error on Google and found that the reason for the
error was that I did not introduce the haoop environment variable.
But my jar submission is not called./bin/flink script originally submitted,
but use the CliFrontend.java ,How to solve this problem?

My code like :





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to submit flink job on yarn by java code

spoon_lz
Sorry, I don't know why the code and error are not visible.
The error is :
 The program finished with the following exception:

/org.apache.flink.client.deployment.ClusterDeploymentException: Could not
deploy Yarn job cluster.
        at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
        at flink.SubmitDemo.submit(SubmitDemo.java:75)
        at flink.SubmitDemo.main(SubmitDemo.java:50)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.
Diagnostics from YARN: Application application_1526888270443_0090 failed 2
times due to AM Container for appattempt_1526888270443_0090_000002 exited
with  exitCode: -1000
For more detailed output, check application tracking
page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
click on links to logs of each attempt.
Diagnostics: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
java.io.FileNotFoundException: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
        at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
        at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
        at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
        at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
        at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
        at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
        at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1526888270443_0090
        at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
        at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
        at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
        ... 5 more/

and my code like :

/public class SubmitDemo {


    private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
    private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
    private static final String JAR_FILE =
"/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";


    public static void main(String[] args) {

        SubmitDemo demo = new SubmitDemo();
        demo.before();
        List<String> parameters = new ArrayList<>();
        parameters.add("run");
        parameters.add("-d");
        parameters.add("-m");
        parameters.add("yarn-cluster");
        parameters.add("-ynm");
        parameters.add("lz_test_alone");
        parameters.add("-yn");
        parameters.add("4");
        parameters.add("-ytm");
        parameters.add("4096");
        parameters.add("-yjm");
        parameters.add("1024");
        parameters.add("-c");
        parameters.add("flink.Demo");
        parameters.add(JAR_FILE);

        try {
            demo.submit(parameters.toArray(new String[parameters.size()]));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void submit(String[] args) throws Exception {

        final String configurationDirectory = ENV_CONF;

        File configFIle = new File(FLINK_CONF);

        final Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());

        FlinkYarnSessionCli cli = new
FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y",
"yarn");

        final List<CustomCommandLine&lt;?>> customCommandLines =
CliFrontend.loadCustomCommandLines(
                flinkConfiguration,
                configurationDirectory);

        CliFrontend testFrontend = new CliFrontend(flinkConfiguration,
customCommandLines);
        //submit
        testFrontend.parseParameters(args);
        CommandLine commandLine = CliFrontendParser.parse(
                CliFrontendParser.getRunCommandOptions(),
                args,
                true);
        final ApplicationId clusterId = cli.getClusterId(commandLine);
        System.out.println("ApplicationId=" + clusterId.toString());
    }

    // SET HADOOP ENV
    private void before() {
        Map<String, String> newenv = Maps.newHashMap();
        newenv.put("HADOOP_CONF_DIR", ENV_CONF);
        newenv.put("YARN_CONF_DIR", ENV_CONF);
           try {
            Class<?> processEnvironmentClass =
Class.forName("java.lang.ProcessEnvironment");
            Field theEnvironmentField =
processEnvironmentClass.getDeclaredField("theEnvironment");
            theEnvironmentField.setAccessible(true);
            Map<String, String> env = (Map<String, String>)
theEnvironmentField.get(null);
            env.putAll(newenv);
            Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
            theCaseInsensitiveEnvironmentField.setAccessible(true);
            Map<String, String> cienv = (Map<String, String>)
theCaseInsensitiveEnvironmentField.get(null);
            cienv.putAll(newenv);
      } catch (NoSuchFieldException e) {
            Class[] classes = Collections.class.getDeclaredClasses();
            Map<String, String> env = System.getenv();
            for (Class cl : classes) {
                if
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
                    Field field = cl.getDeclaredField("m");
                    field.setAccessible(true);
                    Object obj = field.get(env);
                    Map<String, String> map = (Map<String, String>) obj;
                    map.clear();
                    map.putAll(newenv);
                }
            }
        }
    }


}/


the error  is file not found
"/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
"
but I can foud this file .
Previously, I thought it was an environment variable problem and added "
before() ". This method still reported an error




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to submit flink job on yarn by java code

Piotr Nowojski
Hi,

Is this path accessible on the container? If not, use some distributed file system, nfs or -yt —yarnship option of the cli.


Piotrek

On 16 Aug 2018, at 11:05, spoon_lz <[hidden email]> wrote:

Sorry, I don't know why the code and error are not visible.
The error is :
The program finished with the following exception:

/org.apache.flink.client.deployment.ClusterDeploymentException: Could not
deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:239)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at flink.SubmitDemo.submit(SubmitDemo.java:75)
at flink.SubmitDemo.main(SubmitDemo.java:50)
Caused by:
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException:
The YARN application unexpectedly switched to state FAILED during
deployment.
Diagnostics from YARN: Application application_1526888270443_0090 failed 2
times due to AM Container for appattempt_1526888270443_0090_000002 exited
with  exitCode: -1000
For more detailed output, check application tracking
page:http://cluster1:8088/cluster/app/application_1526888270443_0090Then,
click on links to logs of each attempt.
Diagnostics: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
java.io.FileNotFoundException: File
file:/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:358)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further
investigate the issue:
yarn logs -applicationId application_1526888270443_0090
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1059)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:532)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 5 more/

and my code like :

/public class SubmitDemo {


   private final String ENV_CONF = "/usr/ndp/current/yarn_client/conf";
   private final String FLINK_CONF = "/home/demo/flink-1.5.1/conf";
   private static final String JAR_FILE =
"/home/demo/lz/flink1.5_demo-1.16-SNAPSHOT-jar-with-dependencies.jar";


   public static void main(String[] args) {

       SubmitDemo demo = new SubmitDemo();
       demo.before();
       List<String> parameters = new ArrayList<>();
       parameters.add("run");
       parameters.add("-d");
       parameters.add("-m");
       parameters.add("yarn-cluster");
       parameters.add("-ynm");
       parameters.add("lz_test_alone");
       parameters.add("-yn");
       parameters.add("4");
       parameters.add("-ytm");
       parameters.add("4096");
       parameters.add("-yjm");
       parameters.add("1024");
       parameters.add("-c");
       parameters.add("flink.Demo");
       parameters.add(JAR_FILE);

       try {
           demo.submit(parameters.toArray(new String[parameters.size()]));
       } catch (Exception e) {
           e.printStackTrace();
       }
   }

   public void submit(String[] args) throws Exception {

       final String configurationDirectory = ENV_CONF;

       File configFIle = new File(FLINK_CONF);

       final Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration(configFIle.getAbsolutePath());

       FlinkYarnSessionCli cli = new
FlinkYarnSessionCli(flinkConfiguration, configurationDirectory, "y",
"yarn");

       final List<CustomCommandLine&lt;?>> customCommandLines =
CliFrontend.loadCustomCommandLines(
               flinkConfiguration,
               configurationDirectory);

       CliFrontend testFrontend = new CliFrontend(flinkConfiguration,
customCommandLines);
       //submit
       testFrontend.parseParameters(args);
       CommandLine commandLine = CliFrontendParser.parse(
               CliFrontendParser.getRunCommandOptions(),
               args,
               true);
       final ApplicationId clusterId = cli.getClusterId(commandLine);
       System.out.println("ApplicationId=" + clusterId.toString());
   }

   // SET HADOOP ENV
   private void before() {
       Map<String, String> newenv = Maps.newHashMap();
       newenv.put("HADOOP_CONF_DIR", ENV_CONF);
       newenv.put("YARN_CONF_DIR", ENV_CONF);
          try {
           Class<?> processEnvironmentClass =
Class.forName("java.lang.ProcessEnvironment");
           Field theEnvironmentField =
processEnvironmentClass.getDeclaredField("theEnvironment");
           theEnvironmentField.setAccessible(true);
           Map<String, String> env = (Map<String, String>)
theEnvironmentField.get(null);
           env.putAll(newenv);
           Field theCaseInsensitiveEnvironmentField =
processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment");
           theCaseInsensitiveEnvironmentField.setAccessible(true);
           Map<String, String> cienv = (Map<String, String>)
theCaseInsensitiveEnvironmentField.get(null);
           cienv.putAll(newenv);
     } catch (NoSuchFieldException e) {
           Class[] classes = Collections.class.getDeclaredClasses();
           Map<String, String> env = System.getenv();
           for (Class cl : classes) {
               if
("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
                   Field field = cl.getDeclaredField("m");
                   field.setAccessible(true);
                   Object obj = field.get(env);
                   Map<String, String> map = (Map<String, String>) obj;
                   map.clear();
                   map.putAll(newenv);
               }
           }
       }
   }


}/


the error  is file not found
"/home/demo/.flink/application_1526888270443_0090/application_1526888270443_0090-flink-conf.yaml9192370388166197716.tmp
"
but I can foud this file .
Previously, I thought it was an environment variable problem and added "
before() ". This method still reported an error




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/