Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code but I'm having some problems This is how I create the cluster: ------------------------------------------------------------------------------------------------------------ StepConfig copyJarStep = new StepConfig() .withName("copy-jar-step") .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar")); stepConfigs.add(copyJarStep); Application flink = new Application().withName("Flink"); Configuration flinkConfiguration = new Configuration() .withClassification("flink-conf") .addPropertiesEntry("jobmanager.heap.size", "6g") .addPropertiesEntry("taskmanager.heap.size", "6g") .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("cluster-" + executionKey) .withReleaseLabel("emr-5.26.0") .withApplications(flink) .withConfigurations(flinkConfiguration) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri(getWorkPath() + "logs") .withInstances(new JobFlowInstancesConfig() .withEc2SubnetId("mysubnetid") .withInstanceCount(2) .withKeepJobFlowAliveWhenNoSteps(true) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request); --------------------------------------------------------------------------------------------------------- And this is how I add the jobwhen the cluster is ready: ------------------------------------------------------------------------------------------ StepConfig runJobStep = new StepConfig() .withName("run-job-step") .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 --class es.trendit.flink.job.centrality.CentralityJob /home/hadoop/trendit-flink-jobs.jar <args...>")); AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() .withJobFlowId(clusterId) .withSteps(runJobStep); AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request); ----------------------------------------------------------------------------------------------- As summary: - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each) -
jobmanager.heap.size and
taskmanager.heap.size: 6g -
taskmanager.numberOfTaskSlots: 2 - run flink with --parallelism 2 - so 1 EMR instance should be running the jobmanager and the other the taskmanager with 2 slots available But it fails after some time and I see this warning in the step stdout file: ---------------------------------------------------------------------------------------------------------------------- 2020-03-31 14:37:47,288 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - This YARN session requires 12288MB of memory in the cluster. There are currently only 6144MB available. The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available. 2020-03-31 14:37:47,294 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - There is not enough memory available in the YARN cluster. The TaskManager(s) require 6144MB each. NodeManagers available: [6144] After allocating the JobManager (6144MB) and (0/1) TaskManagers, the following NodeManagers are available: [0] The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available. 2020-03-31 14:37:47,296 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=6144, taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2}
---------------------------------------------------------------------------------------------------------------------- And this error in the step stderr file:
---------------------------------------------------------------------------------------------------------------------- org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f0a651302d5fd48d35ff5b5d0880f99) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) ... Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) ... 23 more Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots within timeout of 300000 ms to run the job. Please make sure that the cluster has enough resources. at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ...
----------------------------------------------------------------------------------------------------------------------
It looks to me like the TaskManager is not created at the beginning, any idea why is this happening and how to solve it? I could not find any relevant information in Flink docs Thanks |
Hey,
Isn’t explanation of the problem in the logs that you posted? Not enough memory? You have 2 EMR nodes, 8GB memory each, while trying to allocate 2 TaskManagers AND 1 JobManager with 6GB heap size each? Piotrek > On 31 Mar 2020, at 17:01, Antonio Martínez Carratalá <[hidden email]> wrote: > > Hi, I'm trying to run a job in a Flink cluster in Amazon EMR from java code but I'm having some problems > > This is how I create the cluster: > ------------------------------------------------------------------------------------------------------------ > StepConfig copyJarStep = new StepConfig() > .withName("copy-jar-step") > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + "/lib/trendit-flink-jobs.jar /home/hadoop/trendit-flink-jobs.jar")); > > List<StepConfig> stepConfigs = new ArrayList<>(); > stepConfigs.add(copyJarStep); > > Application flink = new Application().withName("Flink"); > > Configuration flinkConfiguration = new Configuration() > .withClassification("flink-conf") > .addPropertiesEntry("jobmanager.heap.size", "6g") > .addPropertiesEntry("taskmanager.heap.size", "6g") > .addPropertiesEntry("taskmanager.numberOfTaskSlots", "2"); > > RunJobFlowRequest request = new RunJobFlowRequest() > .withName("cluster-" + executionKey) > .withReleaseLabel("emr-5.26.0") > .withApplications(flink) > .withConfigurations(flinkConfiguration) > .withServiceRole("EMR_DefaultRole") > .withJobFlowRole("EMR_EC2_DefaultRole") > .withLogUri(getWorkPath() + "logs") > .withInstances(new JobFlowInstancesConfig() > .withEc2SubnetId("mysubnetid") > .withInstanceCount(2) > .withKeepJobFlowAliveWhenNoSteps(true) > .withMasterInstanceType("m4.large") > .withSlaveInstanceType("m4.large")) > .withSteps(stepConfigs); > > RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request); > --------------------------------------------------------------------------------------------------------- > > And this is how I add the jobwhen the cluster is ready: > ------------------------------------------------------------------------------------------ > StepConfig runJobStep = new StepConfig() > .withName("run-job-step") > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > .withArgs("bash", "-c", "flink run -m yarn-cluster --parallelism 2 --class es.trendit.flink.job.centrality.CentralityJob /home/hadoop/trendit-flink-jobs.jar <args...>")); > > AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() > .withJobFlowId(clusterId) > .withSteps(runJobStep); > > AddJobFlowStepsResult result = amazonClient.getEmrClient().addJobFlowSteps(request); > ----------------------------------------------------------------------------------------------- > > As summary: > - I'm using 2 instances of EMR m4.large machines (2 vCPU, 8GB each) > - jobmanager.heap.size and taskmanager.heap.size: 6g > - taskmanager.numberOfTaskSlots: 2 > - run flink with --parallelism 2 > - so 1 EMR instance should be running the jobmanager and the other the taskmanager with 2 slots available > > But it fails after some time and I see this warning in the step stdout file: > ---------------------------------------------------------------------------------------------------------------------- > 2020-03-31 14:37:47,288 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - This YARN session requires 12288MB of memory in the cluster. There are currently only 6144MB available. > The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available. > 2020-03-31 14:37:47,294 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - There is not enough memory available in the YARN cluster. The TaskManager(s) require 6144MB each. NodeManagers available: [6144] > After allocating the JobManager (6144MB) and (0/1) TaskManagers, the following NodeManagers are available: [0] > The Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available. > 2020-03-31 14:37:47,296 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=6144, taskManagerMemoryMB=6144, numberTaskManagers=1, slotsPerTaskManager=2} > ---------------------------------------------------------------------------------------------------------------------- > > And this error in the step stderr file: > ---------------------------------------------------------------------------------------------------------------------- > org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 1f0a651302d5fd48d35ff5b5d0880f99) > at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:268) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483) > ... > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265) > ... 23 more > Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate enough slots within timeout of 300000 ms to run the job. Please make sure that the cluster has enough resources. > at org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleForExecution$0(Execution.java:449) > at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > ... > ---------------------------------------------------------------------------------------------------------------------- > > It looks to me like the TaskManager is not created at the beginning, any idea why is this happening and how to solve it? I could not find any relevant information in Flink docs > > Thanks > > |
Hi Piotr, I don't have 2 task managers, just one with 2 slots. That would be ok according to my calculations, but as Craig said I need one more instance for the cluster master. I was guessing the job manager was running in the master and the task manager in the slave, but both job manager and task manager run on slaves so I need 3 instances instead of 2 as I guessed. Regards On Wed, Apr 1, 2020 at 1:31 PM Piotr Nowojski <[hidden email]> wrote: Hey, |
Hi,
Sorry I missed that. But yes, it looks like you are running two JobManagers :) You can always check the yarn logs for more information what is being executed. Piotrek
|
Free forum by Nabble | Edit this page |