Hi,
I created a small application that needs to run multiple (batch) jobs on Yarn and then terminate. In this case I'm exporting data from a list of HBase tables I essentially do right now the following: flink run -m yarn-cluster -yn 10 bla.jar ... And in my main I do foreach thing I need to do { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env. ... define the batch job. env.execute } In the second job I submit I get an exception: java.lang.RuntimeException: Unable to tell application master to stop once the specified job has been finised at org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184) at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220) at com.bol.tools.hbase.export.Main.run(Main.java:81) at com.bol.tools.hbase.export.Main.main(Main.java:42) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995) at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992) at org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046) Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at scala.concurrent.Await.result(package.scala) at org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182) ... 25 more How do I (without using yarn-session) tell the YarnClusterClient to simply 'keep running because there will be more jobs'? If I run this same code in a yarn-session it works but then I have the troubles of starting a (detached yarn-session) AND to terminate that thing again after my jobs have run. -- Best regards / Met vriendelijke groeten,
Niels Basjes |
Hi Niels,
This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by only using automatic shut down for detached jobs. In all other cases we should be able to shutdown from the client side after running all jobs. The only downside I see is that Flink clusters may actually never be shutdown if the CLI somehow crashes or gets a network partition. Best, Max On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes <[hidden email]> wrote: > Hi, > > I created a small application that needs to run multiple (batch) jobs on > Yarn and then terminate. > In this case I'm exporting data from a list of HBase tables > > I essentially do right now the following: > > flink run -m yarn-cluster -yn 10 bla.jar ... > > And in my main I do > > foreach thing I need to do { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env. ... define the batch job. > env.execute > } > > In the second job I submit I get an exception: > java.lang.RuntimeException: Unable to tell application master to stop once > the specified job has been finised > at > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) > at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220) > at com.bol.tools.hbase.export.Main.run(Main.java:81) > at com.bol.tools.hbase.export.Main.main(Main.java:42) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995) > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992) > at > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) > at > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) > at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046) > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [10000 milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at scala.concurrent.Await.result(package.scala) > at > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182) > ... 25 more > > > How do I (without using yarn-session) tell the YarnClusterClient to simply > 'keep running because there will be more jobs'? > > If I run this same code in a yarn-session it works but then I have the > troubles of starting a (detached yarn-session) AND to terminate that thing > again after my jobs have run. > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
I have this with a pretty recent version of the source version (not a release). Would be great if you see a way to fix this. I consider it fine if this requires an extra call to the system indicating that this is a 'mulitple job' situation. I created https://issues.apache.org/jira/browse/FLINK-4495 for you Niels Basjes On Thu, Aug 25, 2016 at 3:34 PM, Maximilian Michels <[hidden email]> wrote: Hi Niels, Best regards / Met vriendelijke groeten,
Niels Basjes |
Thanks Niels, actually I also created one :) We will fix this on the
master and for the 1.1.2 release. On Thu, Aug 25, 2016 at 5:14 PM, Niels Basjes <[hidden email]> wrote: > I have this with a pretty recent version of the source version (not a > release). > > Would be great if you see a way to fix this. > I consider it fine if this requires an extra call to the system indicating > that this is a 'mulitple job' situation. > > I created https://issues.apache.org/jira/browse/FLINK-4495 for you > > Niels Basjes > > On Thu, Aug 25, 2016 at 3:34 PM, Maximilian Michels <[hidden email]> wrote: >> >> Hi Niels, >> >> This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by >> only using automatic shut down for detached jobs. In all other cases >> we should be able to shutdown from the client side after running all >> jobs. The only downside I see is that Flink clusters may actually >> never be shutdown if the CLI somehow crashes or gets a network >> partition. >> >> Best, >> Max >> >> On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes <[hidden email]> wrote: >> > Hi, >> > >> > I created a small application that needs to run multiple (batch) jobs on >> > Yarn and then terminate. >> > In this case I'm exporting data from a list of HBase tables >> > >> > I essentially do right now the following: >> > >> > flink run -m yarn-cluster -yn 10 bla.jar ... >> > >> > And in my main I do >> > >> > foreach thing I need to do { >> > ExecutionEnvironment env = >> > ExecutionEnvironment.getExecutionEnvironment(); >> > env. ... define the batch job. >> > env.execute >> > } >> > >> > In the second job I submit I get an exception: >> > java.lang.RuntimeException: Unable to tell application master to stop >> > once >> > the specified job has been finised >> > at >> > >> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184) >> > at >> > >> > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202) >> > at >> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) >> > at >> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376) >> > at >> > >> > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61) >> > at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220) >> > at com.bol.tools.hbase.export.Main.run(Main.java:81) >> > at com.bol.tools.hbase.export.Main.main(Main.java:42) >> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> > at >> > >> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > at >> > >> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > at java.lang.reflect.Method.invoke(Method.java:497) >> > at >> > >> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) >> > at >> > >> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> > at >> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) >> > at >> > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775) >> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) >> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995) >> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992) >> > at >> > >> > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56) >> > at java.security.AccessController.doPrivileged(Native Method) >> > at javax.security.auth.Subject.doAs(Subject.java:422) >> > at >> > >> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) >> > at >> > >> > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53) >> > at >> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992) >> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046) >> > Caused by: java.util.concurrent.TimeoutException: Futures timed out >> > after >> > [10000 milliseconds] >> > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >> > at >> > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >> > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >> > at >> > >> > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >> > at scala.concurrent.Await$.result(package.scala:107) >> > at scala.concurrent.Await.result(package.scala) >> > at >> > >> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182) >> > ... 25 more >> > >> > >> > How do I (without using yarn-session) tell the YarnClusterClient to >> > simply >> > 'keep running because there will be more jobs'? >> > >> > If I run this same code in a yarn-session it works but then I have the >> > troubles of starting a (detached yarn-session) AND to terminate that >> > thing >> > again after my jobs have run. >> > >> > -- >> > Best regards / Met vriendelijke groeten, >> > >> > Niels Basjes > > > > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes |
Free forum by Nabble | Edit this page |