Re: RestClusterClient and classpath

Posted by rmetzger0 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/RestClusterClient-and-classpath-tp38955p38978.html

Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio