Flink on Yarn - ApplicationMaster command

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink on Yarn - ApplicationMaster command

Theofilos Kakantousis
Hi everyone,

I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
submits a flink application to Yarn. To keep it simple I use the
ConnectedComponents app from flink examples.

I set the required properties (Resources, AM ContainerLaunchContext
etc.) on the YARN client interface. What happens is the JobManager and
TaskManager processes start and based on the logs containers are running
but the actual application does not start. I'm probably missing the
proper way to pass parameters to the ApplicationMaster and it cannot
pick up the application it needs to run. Anyone knows where I could get
some info on how to pass runtime params to the AppMaster?

The ApplicationMaster launchcontainer script includes the following:
exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
org.apache.flink.yarn.ApplicationMaster  -c
org.apache.flink.examples.java.graph.ConnectedComponents 1>
/tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "

Thank you,
Theofilos

Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn - ApplicationMaster command

Maximilian Michels
Hi Theofilos,

I'm not sure whether I understand correctly what you are trying to do.
I'm assuming you don't want to use the command-line client.

You can setup the Yarn cluster in your code manually using the
FlinkYarnClient class. The deploy() method will give you a
FlinkYarnCluster which you can use to connect to the deployed cluster.
Then get the JobManager address and use the Client class to submit
Flink jobs to the cluster. I have to warn you that these classes are
subject to change in Flink 1.1.0 and above.

Let me know if the procedure works for you.

Cheers,
Max

On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <[hidden email]> wrote:

> Hi everyone,
>
> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that submits a
> flink application to Yarn. To keep it simple I use the ConnectedComponents
> app from flink examples.
>
> I set the required properties (Resources, AM ContainerLaunchContext etc.) on
> the YARN client interface. What happens is the JobManager and TaskManager
> processes start and based on the logs containers are running but the actual
> application does not start. I'm probably missing the proper way to pass
> parameters to the ApplicationMaster and it cannot pick up the application it
> needs to run. Anyone knows where I could get some info on how to pass
> runtime params to the AppMaster?
>
> The ApplicationMaster launchcontainer script includes the following:
> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
> org.apache.flink.yarn.ApplicationMaster  -c
> org.apache.flink.examples.java.graph.ConnectedComponents 1>
> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>
> Thank you,
> Theofilos
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn - ApplicationMaster command

Theofilos Kakantousis
Hi Max,

Thank you for your reply. Exactly, I want to setup the Yarn cluster and
submit a job through code and not using cmd client.
I had done what you suggested, I used part of the deploy method to write
my own code that starts up the cluster which seems to be working fine.

Could you point me to some examples how to use the Client you mention?

Cheers,
Theofilos

On 2016-04-19 16:35, Maximilian Michels wrote:

> Hi Theofilos,
>
> I'm not sure whether I understand correctly what you are trying to do.
> I'm assuming you don't want to use the command-line client.
>
> You can setup the Yarn cluster in your code manually using the
> FlinkYarnClient class. The deploy() method will give you a
> FlinkYarnCluster which you can use to connect to the deployed cluster.
> Then get the JobManager address and use the Client class to submit
> Flink jobs to the cluster. I have to warn you that these classes are
> subject to change in Flink 1.1.0 and above.
>
> Let me know if the procedure works for you.
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <[hidden email]> wrote:
>> Hi everyone,
>>
>> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that submits a
>> flink application to Yarn. To keep it simple I use the ConnectedComponents
>> app from flink examples.
>>
>> I set the required properties (Resources, AM ContainerLaunchContext etc.) on
>> the YARN client interface. What happens is the JobManager and TaskManager
>> processes start and based on the logs containers are running but the actual
>> application does not start. I'm probably missing the proper way to pass
>> parameters to the ApplicationMaster and it cannot pick up the application it
>> needs to run. Anyone knows where I could get some info on how to pass
>> runtime params to the AppMaster?
>>
>> The ApplicationMaster launchcontainer script includes the following:
>> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
>> org.apache.flink.yarn.ApplicationMaster  -c
>> org.apache.flink.examples.java.graph.ConnectedComponents 1>
>> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>>
>> Thank you,
>> Theofilos
>>

Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn - ApplicationMaster command

Maximilian Michels
Hi Theofilos,

Assuming you have the FlinkYarnCluster after the call to deploy(). You
can get the JobManager address using the

InetSocketAddress address = cluster.getJobManagerAddress();

Then create a Configuration with this address:

Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
address.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());

Then the client:

Client client = new Client(config);

Then use it to submit jobs blocking/detached, e.g.

client.runBlocking(...);
client.runDetached(...);

Cheers,
Max

On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis <[hidden email]> wrote:

> Hi Max,
>
> Thank you for your reply. Exactly, I want to setup the Yarn cluster and
> submit a job through code and not using cmd client.
> I had done what you suggested, I used part of the deploy method to write my
> own code that starts up the cluster which seems to be working fine.
>
> Could you point me to some examples how to use the Client you mention?
>
> Cheers,
> Theofilos
>
>
> On 2016-04-19 16:35, Maximilian Michels wrote:
>>
>> Hi Theofilos,
>>
>> I'm not sure whether I understand correctly what you are trying to do.
>> I'm assuming you don't want to use the command-line client.
>>
>> You can setup the Yarn cluster in your code manually using the
>> FlinkYarnClient class. The deploy() method will give you a
>> FlinkYarnCluster which you can use to connect to the deployed cluster.
>> Then get the JobManager address and use the Client class to submit
>> Flink jobs to the cluster. I have to warn you that these classes are
>> subject to change in Flink 1.1.0 and above.
>>
>> Let me know if the procedure works for you.
>>
>> Cheers,
>> Max
>>
>> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <[hidden email]>
>> wrote:
>>>
>>> Hi everyone,
>>>
>>> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
>>> submits a
>>> flink application to Yarn. To keep it simple I use the
>>> ConnectedComponents
>>> app from flink examples.
>>>
>>> I set the required properties (Resources, AM ContainerLaunchContext etc.)
>>> on
>>> the YARN client interface. What happens is the JobManager and TaskManager
>>> processes start and based on the logs containers are running but the
>>> actual
>>> application does not start. I'm probably missing the proper way to pass
>>> parameters to the ApplicationMaster and it cannot pick up the application
>>> it
>>> needs to run. Anyone knows where I could get some info on how to pass
>>> runtime params to the AppMaster?
>>>
>>> The ApplicationMaster launchcontainer script includes the following:
>>> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
>>> org.apache.flink.yarn.ApplicationMaster  -c
>>> org.apache.flink.examples.java.graph.ConnectedComponents 1>
>>> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>>>
>>> Thank you,
>>> Theofilos
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn - ApplicationMaster command

Theofilos Kakantousis
Hi Max,

I manage to get the jobManagerAddress from FlinkYarnCluster, however when I submit a job using the code below the jobID is null.
Is there something wrong in the way I submit the job? Otherwise any ideas to which direction should I further investigate?

The runBlocking call returns almost immediately. There is no indication the job reaches the JobManager as the last log entries for the jobmanager and taskmanager logs are that the processes have started successfully.


String[] args = {""};
File file = new File("/srv/flink/examples/ConnectedComponents.jar");
int parallelism = 1;
InetSocketAddress jobManagerAddress = cluster.getJobManagerAddress();
org.apache.flink.configuration.Configuration clientConf = new org.apache.flink.configuration.Configuration();
clientConf.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
clientConf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getHostName());
Client client = new Client(clientConf);
try {
    PackagedProgram program = new PackagedProgram(file, "org.apache.flink.examples.java.graph.ConnectedComponents", args);
    client.setPrintStatusDuringExecution(true);
    JobSubmissionResult jobRes = client.runBlocking(program, parallelism);
    JobID jobID = jobRes.getJobID();
} catch (ProgramInvocationException ex) {
    Logger.getLogger(YarnRunner.class.getName()).log(Level.SEVERE, null,
}


Thanks,
Theofilos


On 2016-04-22 16:05, Maximilian Michels wrote:
Hi Theofilos,

Assuming you have the FlinkYarnCluster after the call to deploy(). You
can get the JobManager address using the

InetSocketAddress address = cluster.getJobManagerAddress();

Then create a Configuration with this address:

Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
address.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());

Then the client:

Client client = new Client(config);

Then use it to submit jobs blocking/detached, e.g.

client.runBlocking(...);
client.runDetached(...);

Cheers,
Max

On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis [hidden email] wrote:
Hi Max,

Thank you for your reply. Exactly, I want to setup the Yarn cluster and
submit a job through code and not using cmd client.
I had done what you suggested, I used part of the deploy method to write my
own code that starts up the cluster which seems to be working fine.

Could you point me to some examples how to use the Client you mention?

Cheers,
Theofilos


On 2016-04-19 16:35, Maximilian Michels wrote:
Hi Theofilos,

I'm not sure whether I understand correctly what you are trying to do.
I'm assuming you don't want to use the command-line client.

You can setup the Yarn cluster in your code manually using the
FlinkYarnClient class. The deploy() method will give you a
FlinkYarnCluster which you can use to connect to the deployed cluster.
Then get the JobManager address and use the Client class to submit
Flink jobs to the cluster. I have to warn you that these classes are
subject to change in Flink 1.1.0 and above.

Let me know if the procedure works for you.

Cheers,
Max

On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis [hidden email]
wrote:
Hi everyone,

I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
submits a
flink application to Yarn. To keep it simple I use the
ConnectedComponents
app from flink examples.

I set the required properties (Resources, AM ContainerLaunchContext etc.)
on
the YARN client interface. What happens is the JobManager and TaskManager
processes start and based on the logs containers are running but the
actual
application does not start. I'm probably missing the proper way to pass
parameters to the ApplicationMaster and it cannot pick up the application
it
needs to run. Anyone knows where I could get some info on how to pass
runtime params to the AppMaster?

The ApplicationMaster launchcontainer script includes the following:
exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
org.apache.flink.yarn.ApplicationMaster  -c
org.apache.flink.examples.java.graph.ConnectedComponents 1>
/tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "

Thank you,
Theofilos


      


Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn - ApplicationMaster command

Theofilos Kakantousis
Hi,

The issue was a mismatch of jar versions on my client. Seems to be working fine now.
Thanks again for your help!

Cheers,
Theofilos


On 2016-04-22 18:22, Theofilos Kakantousis wrote:
Hi Max,

I manage to get the jobManagerAddress from FlinkYarnCluster, however when I submit a job using the code below the jobID is null.
Is there something wrong in the way I submit the job? Otherwise any ideas to which direction should I further investigate?

The runBlocking call returns almost immediately. There is no indication the job reaches the JobManager as the last log entries for the jobmanager and taskmanager logs are that the processes have started successfully.


String[] args = {""};
File file = new File("/srv/flink/examples/ConnectedComponents.jar");
int parallelism = 1;
InetSocketAddress jobManagerAddress = cluster.getJobManagerAddress();
org.apache.flink.configuration.Configuration clientConf = new org.apache.flink.configuration.Configuration();
clientConf.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
clientConf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getHostName());
Client client = new Client(clientConf);
try {
    PackagedProgram program = new PackagedProgram(file, "org.apache.flink.examples.java.graph.ConnectedComponents", args);
    client.setPrintStatusDuringExecution(true);
    JobSubmissionResult jobRes = client.runBlocking(program, parallelism);
    JobID jobID = jobRes.getJobID();
} catch (ProgramInvocationException ex) {
    Logger.getLogger(YarnRunner.class.getName()).log(Level.SEVERE, null,
}


Thanks,
Theofilos


On 2016-04-22 16:05, Maximilian Michels wrote:
Hi Theofilos,

Assuming you have the FlinkYarnCluster after the call to deploy(). You
can get the JobManager address using the

InetSocketAddress address = cluster.getJobManagerAddress();

Then create a Configuration with this address:

Configuration config = new Configuration();
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
address.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());

Then the client:

Client client = new Client(config);

Then use it to submit jobs blocking/detached, e.g.

client.runBlocking(...);
client.runDetached(...);

Cheers,
Max

On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis [hidden email] wrote:
Hi Max,

Thank you for your reply. Exactly, I want to setup the Yarn cluster and
submit a job through code and not using cmd client.
I had done what you suggested, I used part of the deploy method to write my
own code that starts up the cluster which seems to be working fine.

Could you point me to some examples how to use the Client you mention?

Cheers,
Theofilos


On 2016-04-19 16:35, Maximilian Michels wrote:
Hi Theofilos,

I'm not sure whether I understand correctly what you are trying to do.
I'm assuming you don't want to use the command-line client.

You can setup the Yarn cluster in your code manually using the
FlinkYarnClient class. The deploy() method will give you a
FlinkYarnCluster which you can use to connect to the deployed cluster.
Then get the JobManager address and use the Client class to submit
Flink jobs to the cluster. I have to warn you that these classes are
subject to change in Flink 1.1.0 and above.

Let me know if the procedure works for you.

Cheers,
Max

On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis [hidden email]
wrote:
Hi everyone,

I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
submits a
flink application to Yarn. To keep it simple I use the
ConnectedComponents
app from flink examples.

I set the required properties (Resources, AM ContainerLaunchContext etc.)
on
the YARN client interface. What happens is the JobManager and TaskManager
processes start and based on the logs containers are running but the
actual
application does not start. I'm probably missing the proper way to pass
parameters to the ApplicationMaster and it cannot pick up the application
it
needs to run. Anyone knows where I could get some info on how to pass
runtime params to the AppMaster?

The ApplicationMaster launchcontainer script includes the following:
exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
org.apache.flink.yarn.ApplicationMaster  -c
org.apache.flink.examples.java.graph.ConnectedComponents 1>
/tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "

Thank you,
Theofilos



Reply | Threaded
Open this post in threaded view
|

Re: Flink on Yarn - ApplicationMaster command

Maximilian Michels
Great to hear! :)

On Sun, Apr 24, 2016 at 3:51 PM, Theofilos Kakantousis <[hidden email]> wrote:

> Hi,
>
> The issue was a mismatch of jar versions on my client. Seems to be working
> fine now.
> Thanks again for your help!
>
> Cheers,
> Theofilos
>
>
> On 2016-04-22 18:22, Theofilos Kakantousis wrote:
>
> Hi Max,
>
> I manage to get the jobManagerAddress from FlinkYarnCluster, however when I
> submit a job using the code below the jobID is null.
> Is there something wrong in the way I submit the job? Otherwise any ideas to
> which direction should I further investigate?
>
> The runBlocking call returns almost immediately. There is no indication the
> job reaches the JobManager as the last log entries for the jobmanager and
> taskmanager logs are that the processes have started successfully.
>
>
> String[] args = {""};
> File file = new File("/srv/flink/examples/ConnectedComponents.jar");
> int parallelism = 1;
> InetSocketAddress jobManagerAddress = cluster.getJobManagerAddress();
> org.apache.flink.configuration.Configuration clientConf = new
> org.apache.flink.configuration.Configuration();
> clientConf.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> jobManagerAddress.getPort());
> clientConf.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> jobManagerAddress.getHostName());
> Client client = new Client(clientConf);
> try {
>     PackagedProgram program = new PackagedProgram(file,
> "org.apache.flink.examples.java.graph.ConnectedComponents", args);
>     client.setPrintStatusDuringExecution(true);
>     JobSubmissionResult jobRes = client.runBlocking(program, parallelism);
>     JobID jobID = jobRes.getJobID();
> } catch (ProgramInvocationException ex) {
>     Logger.getLogger(YarnRunner.class.getName()).log(Level.SEVERE, null,
> }
>
>
> Thanks,
> Theofilos
>
>
> On 2016-04-22 16:05, Maximilian Michels wrote:
>
> Hi Theofilos,
>
> Assuming you have the FlinkYarnCluster after the call to deploy(). You
> can get the JobManager address using the
>
> InetSocketAddress address = cluster.getJobManagerAddress();
>
> Then create a Configuration with this address:
>
> Configuration config = new Configuration();
> config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
> address.getHostName());
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
> address.getPort());
>
> Then the client:
>
> Client client = new Client(config);
>
> Then use it to submit jobs blocking/detached, e.g.
>
> client.runBlocking(...);
> client.runDetached(...);
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 11:25 PM, Theofilos Kakantousis <[hidden email]> wrote:
>
> Hi Max,
>
> Thank you for your reply. Exactly, I want to setup the Yarn cluster and
> submit a job through code and not using cmd client.
> I had done what you suggested, I used part of the deploy method to write my
> own code that starts up the cluster which seems to be working fine.
>
> Could you point me to some examples how to use the Client you mention?
>
> Cheers,
> Theofilos
>
>
> On 2016-04-19 16:35, Maximilian Michels wrote:
>
> Hi Theofilos,
>
> I'm not sure whether I understand correctly what you are trying to do.
> I'm assuming you don't want to use the command-line client.
>
> You can setup the Yarn cluster in your code manually using the
> FlinkYarnClient class. The deploy() method will give you a
> FlinkYarnCluster which you can use to connect to the deployed cluster.
> Then get the JobManager address and use the Client class to submit
> Flink jobs to the cluster. I have to warn you that these classes are
> subject to change in Flink 1.1.0 and above.
>
> Let me know if the procedure works for you.
>
> Cheers,
> Max
>
> On Tue, Apr 19, 2016 at 2:37 PM, Theofilos Kakantousis <[hidden email]>
> wrote:
>
> Hi everyone,
>
> I'm using Flink 0.10.1 and hadoop 2.4.0 to implement a client that
> submits a
> flink application to Yarn. To keep it simple I use the
> ConnectedComponents
> app from flink examples.
>
> I set the required properties (Resources, AM ContainerLaunchContext etc.)
> on
> the YARN client interface. What happens is the JobManager and TaskManager
> processes start and based on the logs containers are running but the
> actual
> application does not start. I'm probably missing the proper way to pass
> parameters to the ApplicationMaster and it cannot pick up the application
> it
> needs to run. Anyone knows where I could get some info on how to pass
> runtime params to the AppMaster?
>
> The ApplicationMaster launchcontainer script includes the following:
> exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx1024M
> org.apache.flink.yarn.ApplicationMaster  -c
> org.apache.flink.examples.java.graph.ConnectedComponents 1>
> /tmp/stdOut5237161854714899800 2>  /tmp/stdErr606502839107545371 "
>
> Thank you,
> Theofilos
>
>
>