Run several jobs in parallel in same EMR cluster?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Run several jobs in parallel in same EMR cluster?

Antonio Martínez Carratalá
Hello

I'm running Flink over Amazon EMR and I'm trying to send several different batch jobs to the cluster after creating it.

This is my cluster creation code:
----------------------------------------------------------------
StepConfig copyJarStep = new StepConfig()
    .withName("copy-jar-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));

List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(copyJarStep);

Application flink = new Application().withName("Flink");

Configuration flinkConfiguration = new Configuration()
     .withClassification("flink-conf")
    .addPropertiesEntry("jobmanager.heap.size", "2048m")
    .addPropertiesEntry("taskmanager.heap.size",  "2048m")

RunJobFlowRequest request = new RunJobFlowRequest()
    .withName("cluster-" + executionKey)
    .withReleaseLabel("emr-5.26.0")
    .withApplications(flink)
    .withConfigurations(flinkConfiguration)
    .withServiceRole("EMR_DefaultRole")
    .withJobFlowRole("EMR_EC2_DefaultRole")
    .withLogUri(getWorkPath() + "logs")
    .withInstances(new JobFlowInstancesConfig()
        .withEc2SubnetId("subnetid")
        .withInstanceCount(2) // 1 for task manager + 1 for job manager
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m4.large")
        .withSlaveInstanceType("m4.large"))
    .withSteps(stepConfigs);

RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
----------------------------------------------------------------------------------------------------------------

And this is how I add the jobs:
---------------------------------------------------------------------------------
StepConfig runJobStep = new StepConfig()
    .withName("run-job-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
    .withArgs("bash", "-c", "flink run -m yarn-cluster"
        + " --parallelism " + parallelism
        + " --class " + jobClass.getCanonicalName()
        + " /home/hadoop/flink-jobs.jar "
        + jobArguments));

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
    .withJobFlowId(clusterId)
    .withSteps(runJobStep);

AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request);
---------------------------------------------------------------------------------

And these are my jobs:

- Job1 - parallelism 1
- Job2 - parallelism 1
- Job3 - parallelism 2

I'm using m4.large machines as slave so I have 2 cores in it, and I was expecting that Job1 and Job2 were running in parallel and then Job3 when Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending status) for Job1 to finish before start. I see only one task manager is created for Job1, when finishes another one is created for Job2, and then 2 are created for Job3

Since I have 2 cores available why is it not running Job2 in the other instead of wait? is there any way to configure it?

Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Run several jobs in parallel in same EMR cluster?

Gary Yao-5
Can you try to set config option taskmanager.numberOfTaskSlots to 2? By default the TMs only offer one slot [1] independent from the number of CPU cores.

Best,
Gary


On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá <[hidden email]> wrote:
Hello

I'm running Flink over Amazon EMR and I'm trying to send several different batch jobs to the cluster after creating it.

This is my cluster creation code:
----------------------------------------------------------------
StepConfig copyJarStep = new StepConfig()
    .withName("copy-jar-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));

List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(copyJarStep);

Application flink = new Application().withName("Flink");

Configuration flinkConfiguration = new Configuration()
     .withClassification("flink-conf")
    .addPropertiesEntry("jobmanager.heap.size", "2048m")
    .addPropertiesEntry("taskmanager.heap.size",  "2048m")

RunJobFlowRequest request = new RunJobFlowRequest()
    .withName("cluster-" + executionKey)
    .withReleaseLabel("emr-5.26.0")
    .withApplications(flink)
    .withConfigurations(flinkConfiguration)
    .withServiceRole("EMR_DefaultRole")
    .withJobFlowRole("EMR_EC2_DefaultRole")
    .withLogUri(getWorkPath() + "logs")
    .withInstances(new JobFlowInstancesConfig()
        .withEc2SubnetId("subnetid")
        .withInstanceCount(2) // 1 for task manager + 1 for job manager
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m4.large")
        .withSlaveInstanceType("m4.large"))
    .withSteps(stepConfigs);

RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
----------------------------------------------------------------------------------------------------------------

And this is how I add the jobs:
---------------------------------------------------------------------------------
StepConfig runJobStep = new StepConfig()
    .withName("run-job-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
    .withArgs("bash", "-c", "flink run -m yarn-cluster"
        + " --parallelism " + parallelism
        + " --class " + jobClass.getCanonicalName()
        + " /home/hadoop/flink-jobs.jar "
        + jobArguments));

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
    .withJobFlowId(clusterId)
    .withSteps(runJobStep);

AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request);
---------------------------------------------------------------------------------

And these are my jobs:

- Job1 - parallelism 1
- Job2 - parallelism 1
- Job3 - parallelism 2

I'm using m4.large machines as slave so I have 2 cores in it, and I was expecting that Job1 and Job2 were running in parallel and then Job3 when Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending status) for Job1 to finish before start. I see only one task manager is created for Job1, when finishes another one is created for Job2, and then 2 are created for Job3

Since I have 2 cores available why is it not running Job2 in the other instead of wait? is there any way to configure it?

Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Run several jobs in parallel in same EMR cluster?

Antonio Martínez Carratalá
I could not make it work as I wanted with taskmanager.numberOfTaskSlots to 2, but I found a way for running them in parallel, just creating a cluster for each job since they are independent

Thanks

On Mon, Mar 30, 2020 at 4:22 PM Gary Yao <[hidden email]> wrote:
Can you try to set config option taskmanager.numberOfTaskSlots to 2? By default the TMs only offer one slot [1] independent from the number of CPU cores.

Best,
Gary


On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá <[hidden email]> wrote:
Hello

I'm running Flink over Amazon EMR and I'm trying to send several different batch jobs to the cluster after creating it.

This is my cluster creation code:
----------------------------------------------------------------
StepConfig copyJarStep = new StepConfig()
    .withName("copy-jar-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
        .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));

List<StepConfig> stepConfigs = new ArrayList<>();
stepConfigs.add(copyJarStep);

Application flink = new Application().withName("Flink");

Configuration flinkConfiguration = new Configuration()
     .withClassification("flink-conf")
    .addPropertiesEntry("jobmanager.heap.size", "2048m")
    .addPropertiesEntry("taskmanager.heap.size",  "2048m")

RunJobFlowRequest request = new RunJobFlowRequest()
    .withName("cluster-" + executionKey)
    .withReleaseLabel("emr-5.26.0")
    .withApplications(flink)
    .withConfigurations(flinkConfiguration)
    .withServiceRole("EMR_DefaultRole")
    .withJobFlowRole("EMR_EC2_DefaultRole")
    .withLogUri(getWorkPath() + "logs")
    .withInstances(new JobFlowInstancesConfig()
        .withEc2SubnetId("subnetid")
        .withInstanceCount(2) // 1 for task manager + 1 for job manager
        .withKeepJobFlowAliveWhenNoSteps(true)
        .withMasterInstanceType("m4.large")
        .withSlaveInstanceType("m4.large"))
    .withSteps(stepConfigs);

RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
----------------------------------------------------------------------------------------------------------------

And this is how I add the jobs:
---------------------------------------------------------------------------------
StepConfig runJobStep = new StepConfig()
    .withName("run-job-step")
    .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
    .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
    .withArgs("bash", "-c", "flink run -m yarn-cluster"
        + " --parallelism " + parallelism
        + " --class " + jobClass.getCanonicalName()
        + " /home/hadoop/flink-jobs.jar "
        + jobArguments));

AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
    .withJobFlowId(clusterId)
    .withSteps(runJobStep);

AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request);
---------------------------------------------------------------------------------

And these are my jobs:

- Job1 - parallelism 1
- Job2 - parallelism 1
- Job3 - parallelism 2

I'm using m4.large machines as slave so I have 2 cores in it, and I was expecting that Job1 and Job2 were running in parallel and then Job3 when Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending status) for Job1 to finish before start. I see only one task manager is created for Job1, when finishes another one is created for Job2, and then 2 are created for Job3

Since I have 2 cores available why is it not running Job2 in the other instead of wait? is there any way to configure it?

Thanks