The program didn't contain a Flink job

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

The program didn't contain a Flink job

eSKa
Hello, We are currently running jobs on Flink 1.4.2. Our usecase is as follows:
-service get request from customer
- we submit job to flink using YarnClusterClient
Sometimes we have up to 6 jobs at the same time.

From time to time we got error as below:
The program didn't contain a Flink job.
org.apache.flink.client.program.ProgramMissingJobException: The program didn't contain a Flink job.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:398)


From logs we can see that main method from job is returning correct status, but for some reason later Flink throws that exception anyway. Do you know what could be a case here and how to prevent it from happening?

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

Rong Rong
Did you forget to call executionEnvironment.execute() after you define your Flink job?

--
Rong

On Mon, Jul 2, 2018 at 1:42 AM eSKa <[hidden email]> wrote:
Hello, We are currently running jobs on Flink 1.4.2. Our usecase is as follows:
-service get request from customer
- we submit job to flink using YarnClusterClient
Sometimes we have up to 6 jobs at the same time.

From time to time we got error as below:
The program didn't contain a Flink job.
org.apache.flink.client.program.ProgramMissingJobException: The program didn't contain a Flink job.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:398)


From logs we can see that main method from job is returning correct status, but for some reason later Flink throws that exception anyway. Do you know what could be a case here and how to prevent it from happening?

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

eSKa
No.
execute was called, and all calculation succeeded - there were job on
dashboard with status FINISHED.
after execute we had our logs that were claiming that everything succeded.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

Rong Rong
Hmm. That's strange. 

Can you explain a little more on how your YARN cluster is set up and how you configure the submission context? 
Also, did you try submitting the jobs in detach mode?

Is this happening from time to time for one specific job graph? Or it is consistently throwing the exception for the same job?

--
Rong



On Mon, Jul 2, 2018 at 7:57 AM eSKa <[hidden email]> wrote:
No.
execute was called, and all calculation succeeded - there were job on
dashboard with status FINISHED.
after execute we had our logs that were claiming that everything succeded.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

eSKa
We are running same job all the time. And that error is happening from time to time.


Here is job submittion code:
private JobSubmissionResult submitProgramToCluster(PackagedProgram packagedProgram) throws JobSubmitterException,
ProgramMissingJobException, ProgramInvocationException {

ClusterClient clusterClient = clusterClientUtil.getPrototypeClusterClient();
int parallelism = Integer.parseInt(serverConfiguration.envParallelism);

return clusterClient.run(packagedProgram, parallelism);
}


And here our util for retrieving ClusterClient.
public class ClusterClientUtil {

...

public ClusterClient getPrototypeClusterClient() throws JobSubmitterException {
return createClusterClientInstance();
}

private synchronized ClusterClient createClusterClientInstance() throws JobSubmitterException {
try {
LOG.info("Creating new ClusterClient instance.");
Configuration configuration = flinkConfigurator.getFlinkConfiguration();

ApplicationId applicationId = ConverterUtils.toApplicationId(configuration.getString(FlinkConfigurator.PROPERTY_FLINK_APP_ID, ""));
logger.debug("Retrieved Flink applicationId: {}", applicationId.toString());

YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(configuration, serverConfiguration.getFlinkConfigurationDirectory);

ApplicationReport applicationReport = yarnClient.getApplicationReport(applicationId);

final int numberTaskManagers = configuration.getInteger(FlinkConfigurator.PROPERTY_FLINK_NUMBER_TASK_MANAGERS, 0);
final int slotsPerTaskManager = configuration.getInteger(FlinkConfigurator.PROPERTY_FLINK_SLOTS_PER_TASK_MANAGER, -1);

return new YarnClusterClient(yarnClusterDescriptor, numberTaskManagers, slotsPerTaskManager, yarnClient, applicationReport, configuration, false);
} catch (Exception e) {
throw new JobSubmitterException("Unable to create YarnClusterClient.", e);
}
}

What Yarn settings to you need?

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

Chesnay Schepler
Are you executing these jobs concurrently?
The ClusterClient was not written to be used concurrently in the same JVM, as it partially relies and mutates static fields.

On 03.07.2018 09:50, eSKa wrote:
We are running same job all the time. And that error is happening from time to time.


Here is job submittion code:
private JobSubmissionResult submitProgramToCluster(PackagedProgram packagedProgram) throws JobSubmitterException,
ProgramMissingJobException, ProgramInvocationException {

ClusterClient clusterClient = clusterClientUtil.getPrototypeClusterClient();
int parallelism = Integer.parseInt(serverConfiguration.envParallelism);

return clusterClient.run(packagedProgram, parallelism);
}


And here our util for retrieving ClusterClient.
public class ClusterClientUtil {

...

public ClusterClient getPrototypeClusterClient() throws JobSubmitterException {
return createClusterClientInstance();
}

private synchronized ClusterClient createClusterClientInstance() throws JobSubmitterException {
try {
LOG.info("Creating new ClusterClient instance.");
Configuration configuration = flinkConfigurator.getFlinkConfiguration();

ApplicationId applicationId = ConverterUtils.toApplicationId(configuration.getString(FlinkConfigurator.PROPERTY_FLINK_APP_ID, ""));
logger.debug("Retrieved Flink applicationId: {}", applicationId.toString());

YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(configuration, serverConfiguration.getFlinkConfigurationDirectory);

ApplicationReport applicationReport = yarnClient.getApplicationReport(applicationId);

final int numberTaskManagers = configuration.getInteger(FlinkConfigurator.PROPERTY_FLINK_NUMBER_TASK_MANAGERS, 0);
final int slotsPerTaskManager = configuration.getInteger(FlinkConfigurator.PROPERTY_FLINK_SLOTS_PER_TASK_MANAGER, -1);

return new YarnClusterClient(yarnClusterDescriptor, numberTaskManagers, slotsPerTaskManager, yarnClient, applicationReport, configuration, false);
} catch (Exception e) {
throw new JobSubmitterException("Unable to create YarnClusterClient.", e);
}
}

What Yarn settings to you need?

Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

eSKa
Yes - we are submitting jobs one by one.
How can we change that to work for our needs?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

Fabian Hueske-2
In reply to this post by Rong Rong
Hi,

Let me summarize:
1) Sometimes you get the error message "org.apache.flink.client.program.ProgramMissingJobException: The program didn't contain a Flink job.".  when submitting a program through the YarnClusterClient
2) The logs and the dashboard state that the job ran successful
3) The job performed all computations correctly.

So the issue is that there is a invalid error message that suggests that a job failed but in fact it ran successfully.

Is that correct?

Thanks, Fabian

2018-07-02 17:14 GMT+02:00 Rong Rong <[hidden email]>:
Hmm. That's strange. 

Can you explain a little more on how your YARN cluster is set up and how you configure the submission context? 
Also, did you try submitting the jobs in detach mode?

Is this happening from time to time for one specific job graph? Or it is consistently throwing the exception for the same job?

--
Rong



On Mon, Jul 2, 2018 at 7:57 AM eSKa <[hidden email]> wrote:
No.
execute was called, and all calculation succeeded - there were job on
dashboard with status FINISHED.
after execute we had our logs that were claiming that everything succeded.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

Chuanlei Ni
In reply to this post by eSKa
HI, @chesnay

I read the code of `ClusterClient`, and have not found the `static` field.

So why cannot be used in the same jvm? (we also use `ClusterCLient` this way, so we really care about this feature)

eSKa <[hidden email]> 于2018年7月3日周二 下午4:00写道:
Yes - we are submitting jobs one by one.
How can we change that to work for our needs?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

Chesnay Schepler
Dive into this call and you sill see that it mutates static fields in the ExecutionEnvironment.

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java#L422

On 03.07.2018 10:07, Chuanlei Ni wrote:
HI, @chesnay

I read the code of `ClusterClient`, and have not found the `static` field.

So why cannot be used in the same jvm? (we also use `ClusterCLient` this way, so we really care about this feature)

eSKa <[hidden email]> 于2018年7月3日周二 下午4:00写道:
Yes - we are submitting jobs one by one.
How can we change that to work for our needs?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

eSKa
In reply to this post by Fabian Hueske-2
Yes - it seems that main method returns success but for some reason we have
that exception thrown.
For now we applied workaround to catch exception and just skip it (later on
our statusUpdater is reading statuses from FlinkDashboard).





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: The program didn't contain a Flink job

Chuanlei Ni
I really interesting in making `ClusterClient` be used as multiple-instance in a jvm, because we need submit job in a long running process.

I create a jira for this problem.


eSKa <[hidden email]> 于2018年7月3日周二 下午4:20写道:
Yes - it seems that main method returns success but for some reason we have
that exception thrown.
For now we applied workaround to catch exception and just skip it (later on
our statusUpdater is reading statuses from FlinkDashboard).





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/