How to submit flink jars from plain Java programs?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to submit flink jars from plain Java programs?

Rabe, Jens
Hello,

I have a web application which works on a Hadoop cluster in the background. The application is based on Spring Boot, and is packaged via spring-boot-maven-plugin. This plugin works similar to the maven-assembly-plugin as it puts the dependencies as jars into the final output jars. For ordinary Hadoop MapReduce jobs, I add them as dependencies to my application so they are included in the final jar.
I now create a new Hadoop Configuration (simply via new Configuration()) and add all Hadoop configuration XML files for my cluster as resources to it (conf.addResource()), and additionally, I set "fs.hdfs.impl" to DistributedFileSystem.class.
With this Configuration, I can access the HDFS and submit MapReduce jobs from my web app just fine.

How do I achieve a similar behaviour with Flink?
Reply | Threaded
Open this post in threaded view
|

Re: How to submit flink jars from plain Java programs?

Max Michels
Hi Jens,

If I understand correctly, you are looking for a method to submit
jar-packaged Flink jobs to the job manager. If you have the job
manager running at "example.com" on port 6123, then you can a submit a
job like this:


import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.net.InetSocketAddress;

public class MyTestsJava {
    public static void main(String[] args) {
        String path = "/path/to/flink/job.jar";
        String in = "/tmp/input";
        String out = "/tmp/output";
        submitJar(path, in, out);
    }

    public static void submitJar(String path, String... args){

        File file = new File(path);

        int parallelism = 1;
        boolean wait = true;

        try {
            PackagedProgram program = new PackagedProgram(file, args);
            InetSocketAddress jobManagerAddress =
RemoteExecutor.getInetFromHostport("example.com:6123");

            Client client = new Client(jobManagerAddress, new
Configuration(), program.getUserCodeClassLoader());

            System.out.println("Executing " + path);
            client.run(program, parallelism, wait);

        } catch (ProgramInvocationException e) {
            e.printStackTrace();
        }
    }
}


If you have set up Flink correctly, you can also access HDFS in the
Flink job. Let me know if this is what you had in mind.

Best regards,
Max
Reply | Threaded
Open this post in threaded view
|

Re: How to submit flink jars from plain Java programs?

Stephan Ewen
Max's answer is the best approach for pre-packaged Jar programs.

In addition, you have the RemoteEnvironment in Flink. It allows you to write constructs like this:


public class MyProgram {

   private static final String JAR_FILE_PATH = "/path/to/jar";

    public static void main(String[] args) {
        String host = args[0];
        int port = Integer.parseInt(args[1]);

        ExecutionEnvironemnt env = ExecutionEnvironemnt.createRemoteEnvironment(host, port, JAR_FILE_PATH);
        
         // regular program

        env.execute();
    }
 }



This code will also take the program and execute it on the cluster, using the classes in the jar file.

Both methods are nice in different environments.

Greetings,
Stephan


On Mon, Jan 26, 2015 at 5:19 AM, Max Michels <[hidden email]> wrote:
Hi Jens,

If I understand correctly, you are looking for a method to submit
jar-packaged Flink jobs to the job manager. If you have the job
manager running at "example.com" on port 6123, then you can a submit a
job like this:


import org.apache.flink.client.RemoteExecutor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;

import java.io.File;
import java.net.InetSocketAddress;

public class MyTestsJava {
    public static void main(String[] args) {
        String path = "/path/to/flink/job.jar";
        String in = "/tmp/input";
        String out = "/tmp/output";
        submitJar(path, in, out);
    }

    public static void submitJar(String path, String... args){

        File file = new File(path);

        int parallelism = 1;
        boolean wait = true;

        try {
            PackagedProgram program = new PackagedProgram(file, args);
            InetSocketAddress jobManagerAddress =
RemoteExecutor.getInetFromHostport("example.com:6123");

            Client client = new Client(jobManagerAddress, new
Configuration(), program.getUserCodeClassLoader());

            System.out.println("Executing " + path);
            client.run(program, parallelism, wait);

        } catch (ProgramInvocationException e) {
            e.printStackTrace();
        }
    }
}


If you have set up Flink correctly, you can also access HDFS in the
Flink job. Let me know if this is what you had in mind.

Best regards,
Max