Running multiple jobs on yarn (without yarn-session)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Running multiple jobs on yarn (without yarn-session)

Niels Basjes
Hi,

I created a small application that needs to run multiple (batch) jobs on Yarn and then terminate.
In this case I'm exporting data from a list of HBase tables

I essentially do right now the following:

flink run -m yarn-cluster -yn 10  bla.jar ...

And in my main I do

foreach thing I need to do {    
   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
   env. ... define the batch job.
   env.execute
}

In the second job I submit I get an exception:
java.lang.RuntimeException: Unable to tell application master to stop once the specified job has been finised
at org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184)
at org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
at com.bol.tools.hbase.export.Main.run(Main.java:81)
at com.bol.tools.hbase.export.Main.main(Main.java:42)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
at org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182)
... 25 more


How do I (without using yarn-session) tell the YarnClusterClient to simply 'keep running because there will be more jobs'?

If I run this same code in a yarn-session it works but then I have the troubles of starting a (detached yarn-session) AND to terminate that thing again after my jobs have run.

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Running multiple jobs on yarn (without yarn-session)

Maximilian Michels
Hi Niels,

This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by
only using automatic shut down for detached jobs. In all other cases
we should be able to shutdown from the client side after running all
jobs. The only downside I see is that Flink clusters may actually
never be shutdown if the CLI somehow crashes or gets a network
partition.

Best,
Max

On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes <[hidden email]> wrote:

> Hi,
>
> I created a small application that needs to run multiple (batch) jobs on
> Yarn and then terminate.
> In this case I'm exporting data from a list of HBase tables
>
> I essentially do right now the following:
>
> flink run -m yarn-cluster -yn 10  bla.jar ...
>
> And in my main I do
>
> foreach thing I need to do {
>    ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>    env. ... define the batch job.
>    env.execute
> }
>
> In the second job I submit I get an exception:
> java.lang.RuntimeException: Unable to tell application master to stop once
> the specified job has been finised
> at
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184)
> at
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
> at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
> at com.bol.tools.hbase.export.Main.run(Main.java:81)
> at com.bol.tools.hbase.export.Main.main(Main.java:42)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
> at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
> at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
> at
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10000 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at scala.concurrent.Await.result(package.scala)
> at
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182)
> ... 25 more
>
>
> How do I (without using yarn-session) tell the YarnClusterClient to simply
> 'keep running because there will be more jobs'?
>
> If I run this same code in a yarn-session it works but then I have the
> troubles of starting a (detached yarn-session) AND to terminate that thing
> again after my jobs have run.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Running multiple jobs on yarn (without yarn-session)

Niels Basjes
I have this with a pretty recent version of the source version (not a release).

Would be great if you see a way to fix this.
I consider it fine if this requires an extra call to the system indicating that this is a 'mulitple job' situation.


Niels Basjes

On Thu, Aug 25, 2016 at 3:34 PM, Maximilian Michels <[hidden email]> wrote:
Hi Niels,

This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by
only using automatic shut down for detached jobs. In all other cases
we should be able to shutdown from the client side after running all
jobs. The only downside I see is that Flink clusters may actually
never be shutdown if the CLI somehow crashes or gets a network
partition.

Best,
Max

On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes <[hidden email]> wrote:
> Hi,
>
> I created a small application that needs to run multiple (batch) jobs on
> Yarn and then terminate.
> In this case I'm exporting data from a list of HBase tables
>
> I essentially do right now the following:
>
> flink run -m yarn-cluster -yn 10  bla.jar ...
>
> And in my main I do
>
> foreach thing I need to do {
>    ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>    env. ... define the batch job.
>    env.execute
> }
>
> In the second job I submit I get an exception:
> java.lang.RuntimeException: Unable to tell application master to stop once
> the specified job has been finised
> at
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184)
> at
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
> at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
> at com.bol.tools.hbase.export.Main.run(Main.java:81)
> at com.bol.tools.hbase.export.Main.main(Main.java:42)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
> at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
> at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
> at
> org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
> at
> org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
> at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10000 milliseconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:107)
> at scala.concurrent.Await.result(package.scala)
> at
> org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182)
> ... 25 more
>
>
> How do I (without using yarn-session) tell the YarnClusterClient to simply
> 'keep running because there will be more jobs'?
>
> If I run this same code in a yarn-session it works but then I have the
> troubles of starting a (detached yarn-session) AND to terminate that thing
> again after my jobs have run.
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes



--
Best regards / Met vriendelijke groeten,

Niels Basjes
Reply | Threaded
Open this post in threaded view
|

Re: Running multiple jobs on yarn (without yarn-session)

Maximilian Michels
Thanks Niels, actually I also created one :) We will fix this on the
master and for the 1.1.2 release.

On Thu, Aug 25, 2016 at 5:14 PM, Niels Basjes <[hidden email]> wrote:

> I have this with a pretty recent version of the source version (not a
> release).
>
> Would be great if you see a way to fix this.
> I consider it fine if this requires an extra call to the system indicating
> that this is a 'mulitple job' situation.
>
> I created https://issues.apache.org/jira/browse/FLINK-4495 for you
>
> Niels Basjes
>
> On Thu, Aug 25, 2016 at 3:34 PM, Maximilian Michels <[hidden email]> wrote:
>>
>> Hi Niels,
>>
>> This is with 1.1.1? We could fix this in the upcoming 1.1.2 release by
>> only using automatic shut down for detached jobs. In all other cases
>> we should be able to shutdown from the client side after running all
>> jobs. The only downside I see is that Flink clusters may actually
>> never be shutdown if the CLI somehow crashes or gets a network
>> partition.
>>
>> Best,
>> Max
>>
>> On Thu, Aug 25, 2016 at 12:04 PM, Niels Basjes <[hidden email]> wrote:
>> > Hi,
>> >
>> > I created a small application that needs to run multiple (batch) jobs on
>> > Yarn and then terminate.
>> > In this case I'm exporting data from a list of HBase tables
>> >
>> > I essentially do right now the following:
>> >
>> > flink run -m yarn-cluster -yn 10  bla.jar ...
>> >
>> > And in my main I do
>> >
>> > foreach thing I need to do {
>> >    ExecutionEnvironment env =
>> > ExecutionEnvironment.getExecutionEnvironment();
>> >    env. ... define the batch job.
>> >    env.execute
>> > }
>> >
>> > In the second job I submit I get an exception:
>> > java.lang.RuntimeException: Unable to tell application master to stop
>> > once
>> > the specified job has been finised
>> > at
>> >
>> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:184)
>> > at
>> >
>> > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:202)
>> > at
>> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>> > at
>> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:376)
>> > at
>> >
>> > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:61)
>> > at com.bol.tools.hbase.export.Main.runSingleTable(Main.java:220)
>> > at com.bol.tools.hbase.export.Main.run(Main.java:81)
>> > at com.bol.tools.hbase.export.Main.main(Main.java:42)
>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> > at
>> >
>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> > at
>> >
>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> > at java.lang.reflect.Method.invoke(Method.java:497)
>> > at
>> >
>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>> > at
>> >
>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> > at
>> > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>> > at
>> > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:775)
>> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:995)
>> > at org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:992)
>> > at
>> >
>> > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:56)
>> > at java.security.AccessController.doPrivileged(Native Method)
>> > at javax.security.auth.Subject.doAs(Subject.java:422)
>> > at
>> >
>> > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>> > at
>> >
>> > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:53)
>> > at
>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:992)
>> > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1046)
>> > Caused by: java.util.concurrent.TimeoutException: Futures timed out
>> > after
>> > [10000 milliseconds]
>> > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> > at
>> > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>> > at
>> >
>> > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>> > at scala.concurrent.Await$.result(package.scala:107)
>> > at scala.concurrent.Await.result(package.scala)
>> > at
>> >
>> > org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:182)
>> > ... 25 more
>> >
>> >
>> > How do I (without using yarn-session) tell the YarnClusterClient to
>> > simply
>> > 'keep running because there will be more jobs'?
>> >
>> > If I run this same code in a yarn-session it works but then I have the
>> > troubles of starting a (detached yarn-session) AND to terminate that
>> > thing
>> > again after my jobs have run.
>> >
>> > --
>> > Best regards / Met vriendelijke groeten,
>> >
>> > Niels Basjes
>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes