Flink gelly dependency in transient EMR cluster

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink gelly dependency in transient EMR cluster

Antonio Martínez Carratalá
Hello,

I'm trying to run a flink job that works with graphs in a transient cluster in EMR, here is my code:

----------
    HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME + "/pugore-flink.jar /home/hadoop/pugore-flink.jar");

    StepConfig copyJarStep = new StepConfig()
            .withName("Copy Jar")
            .withHadoopJarStep(copyJarStepConf);

    HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1"
                    + " --class es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar"
                    + " --alpha 0.05"
                    + " --iterations 50"
                    + " --input s3://" + BUCKET_NAME + "/" + key + "/edges.csv"
                    + " --output s3://" + BUCKET_NAME + "/" + key + "/vertices-centrality.csv");

    StepConfig flinkRunJobStep = new StepConfig()
            .withName("Flink job")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(flinkJobConf);

    List<StepConfig> stepConfigs = new ArrayList<>();
    stepConfigs.add(copyJarStep);
    stepConfigs.add(flinkRunJobStep);

    Application flink = new Application().withName("Flink");

    String clusterName = "flink-job-" + key;
    RunJobFlowRequest request = new RunJobFlowRequest()
            .withName(clusterName)
            .withReleaseLabel("emr-5.26.0")
            .withApplications(flink)
            .withServiceRole("EMR_DefaultRole")
            .withJobFlowRole("EMR_EC2_DefaultRole")
            .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs")
            .withInstances(new JobFlowInstancesConfig()
                    .withInstanceCount(2)
                    .withKeepJobFlowAliveWhenNoSteps(false)
                    .withMasterInstanceType("m4.large")
                    .withSlaveInstanceType("m4.large"))
            .withSteps(stepConfigs);

    RunJobFlowResult result = getEmrClient().runJobFlow(request);
    String clusterId = result.getJobFlowId();

    log.debug("[" + key + "] cluster created with id: " + clusterId);
-------------------------

This job creates the cluster from scratch and launches my job, it is executed but I'm getting the following error:

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/graph/GraphAlgorithm

In my local cluster I copy the flink-gelly jar from flink/opt to flink/lib and it works, is there any way to do it automatically in a transient EMR cluster before launching the job?

I know I can put the jar in S3 and copy it from there as I do with my jar in the first step and then use it as classpath, but I'm wondering if it is possible to instruct EMR to include that dependency in some way, maybe with some option in Application, Configuration,  BootstrapAction or any other... since it is a Flink dependency

Thank you




Reply | Threaded
Open this post in threaded view
|

Re: Flink gelly dependency in transient EMR cluster

Antonio Martínez Carratalá
I reply to myself with the solution in case someone else is having the same question

It is only needed to add a copy command to copy the jar from flink/opt to flink/lib, in my case:

StepConfig addGellyStep = new StepConfig()
        .withName("add-gelly-step")
        .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
                .withArgs("bash", "-c", "sudo cp /usr/lib/flink/opt/flink-gelly_2.11-1.8.0.jar /usr/lib/flink/lib"));



On Thu, Mar 12, 2020 at 9:43 AM Antonio Martínez Carratalá <[hidden email]> wrote:
Hello,

I'm trying to run a flink job that works with graphs in a transient cluster in EMR, here is my code:

----------
    HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME + "/pugore-flink.jar /home/hadoop/pugore-flink.jar");

    StepConfig copyJarStep = new StepConfig()
            .withName("Copy Jar")
            .withHadoopJarStep(copyJarStepConf);

    HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1"
                    + " --class es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar"
                    + " --alpha 0.05"
                    + " --iterations 50"
                    + " --input s3://" + BUCKET_NAME + "/" + key + "/edges.csv"
                    + " --output s3://" + BUCKET_NAME + "/" + key + "/vertices-centrality.csv");

    StepConfig flinkRunJobStep = new StepConfig()
            .withName("Flink job")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(flinkJobConf);

    List<StepConfig> stepConfigs = new ArrayList<>();
    stepConfigs.add(copyJarStep);
    stepConfigs.add(flinkRunJobStep);

    Application flink = new Application().withName("Flink");

    String clusterName = "flink-job-" + key;
    RunJobFlowRequest request = new RunJobFlowRequest()
            .withName(clusterName)
            .withReleaseLabel("emr-5.26.0")
            .withApplications(flink)
            .withServiceRole("EMR_DefaultRole")
            .withJobFlowRole("EMR_EC2_DefaultRole")
            .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs")
            .withInstances(new JobFlowInstancesConfig()
                    .withInstanceCount(2)
                    .withKeepJobFlowAliveWhenNoSteps(false)
                    .withMasterInstanceType("m4.large")
                    .withSlaveInstanceType("m4.large"))
            .withSteps(stepConfigs);

    RunJobFlowResult result = getEmrClient().runJobFlow(request);
    String clusterId = result.getJobFlowId();

    log.debug("[" + key + "] cluster created with id: " + clusterId);
-------------------------

This job creates the cluster from scratch and launches my job, it is executed but I'm getting the following error:

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/graph/GraphAlgorithm

In my local cluster I copy the flink-gelly jar from flink/opt to flink/lib and it works, is there any way to do it automatically in a transient EMR cluster before launching the job?

I know I can put the jar in S3 and copy it from there as I do with my jar in the first step and then use it as classpath, but I'm wondering if it is possible to instruct EMR to include that dependency in some way, maybe with some option in Application, Configuration,  BootstrapAction or any other... since it is a Flink dependency

Thank you





Reply | Threaded
Open this post in threaded view
|

Re: Flink gelly dependency in transient EMR cluster

Till Rohrmann
Alternatively, you could also bundle the Gelly dependency with your user code jar by creating an uber jar. The downside of this approach would be an increased jar size which needs to be uploaded to the cluster.

Cheers,
Till

On Thu, Mar 12, 2020 at 4:13 PM Antonio Martínez Carratalá <[hidden email]> wrote:
I reply to myself with the solution in case someone else is having the same question

It is only needed to add a copy command to copy the jar from flink/opt to flink/lib, in my case:

StepConfig addGellyStep = new StepConfig()
        .withName("add-gelly-step")
        .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
                .withArgs("bash", "-c", "sudo cp /usr/lib/flink/opt/flink-gelly_2.11-1.8.0.jar /usr/lib/flink/lib"));



On Thu, Mar 12, 2020 at 9:43 AM Antonio Martínez Carratalá <[hidden email]> wrote:
Hello,

I'm trying to run a flink job that works with graphs in a transient cluster in EMR, here is my code:

----------
    HadoopJarStepConfig copyJarStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("bash", "-c", "aws s3 cp s3://" + BUCKET_NAME + "/pugore-flink.jar /home/hadoop/pugore-flink.jar");

    StepConfig copyJarStep = new StepConfig()
            .withName("Copy Jar")
            .withHadoopJarStep(copyJarStepConf);

    HadoopJarStepConfig flinkJobConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("bash", "-c", "flink run -m yarn-cluster -yn 1"
                    + " --class es.pugore.flink.job.centrality.CentralityJob /home/hadoop/pugore-flink.jar"
                    + " --alpha 0.05"
                    + " --iterations 50"
                    + " --input s3://" + BUCKET_NAME + "/" + key + "/edges.csv"
                    + " --output s3://" + BUCKET_NAME + "/" + key + "/vertices-centrality.csv");

    StepConfig flinkRunJobStep = new StepConfig()
            .withName("Flink job")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(flinkJobConf);

    List<StepConfig> stepConfigs = new ArrayList<>();
    stepConfigs.add(copyJarStep);
    stepConfigs.add(flinkRunJobStep);

    Application flink = new Application().withName("Flink");

    String clusterName = "flink-job-" + key;
    RunJobFlowRequest request = new RunJobFlowRequest()
            .withName(clusterName)
            .withReleaseLabel("emr-5.26.0")
            .withApplications(flink)
            .withServiceRole("EMR_DefaultRole")
            .withJobFlowRole("EMR_EC2_DefaultRole")
            .withLogUri("s3://" + BUCKET_NAME + "/" + key + "/logs")
            .withInstances(new JobFlowInstancesConfig()
                    .withInstanceCount(2)
                    .withKeepJobFlowAliveWhenNoSteps(false)
                    .withMasterInstanceType("m4.large")
                    .withSlaveInstanceType("m4.large"))
            .withSteps(stepConfigs);

    RunJobFlowResult result = getEmrClient().runJobFlow(request);
    String clusterId = result.getJobFlowId();

    log.debug("[" + key + "] cluster created with id: " + clusterId);
-------------------------

This job creates the cluster from scratch and launches my job, it is executed but I'm getting the following error:

Caused by: java.lang.NoClassDefFoundError: org/apache/flink/graph/GraphAlgorithm

In my local cluster I copy the flink-gelly jar from flink/opt to flink/lib and it works, is there any way to do it automatically in a transient EMR cluster before launching the job?

I know I can put the jar in S3 and copy it from there as I do with my jar in the first step and then use it as classpath, but I'm wondering if it is possible to instruct EMR to include that dependency in some way, maybe with some option in Application, Configuration,  BootstrapAction or any other... since it is a Flink dependency

Thank you