RestClusterClient and classpath

classic Classic list List threaded Threaded
23 messages Options
12
Reply | Threaded
Open this post in threaded view
|

RestClusterClient and classpath

Flavio Pompermaier
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

rmetzger0
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Flavio Pompermaier
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
    final List<String> kryoSerializers = new ArrayList<>();
    kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio
Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler
Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
    final List<String> kryoSerializers = new ArrayList<>();
    kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio


Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler
* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
    final List<String> kryoSerializers = new ArrayList<>();
    kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Flavio Pompermaier
In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
    final List<String> kryoSerializers = new ArrayList<>();
    kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Flavio Pompermaier
Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
    final List<String> kryoSerializers = new ArrayList<>();
    kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio



Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler

hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.

@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to createJobGraph like this:

final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
   Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
   // do tstuff
} finally {
   Thread.currentThread().setContextClassLoader(contextClassLoader);
}

On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
    final List<String> kryoSerializers = new ArrayList<>();
    kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Flavio Pompermaier
Always the same problem.

Caused by: java.lang.ClassNotFoundException: it.test.XXX
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 10 more

I've also tried with 

    flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");

but nothing changes.

On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:

hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.

@klou This seems like wrong behavior?

@Flavio What you could try in the meantime is wrap the call to createJobGraph like this:

final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
   Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
   // do tstuff
} finally {
   Thread.currentThread().setContextClassLoader(contextClassLoader);
}

On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?

On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.

On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
    conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
    final List<String> kryoSerializers = new ArrayList<>();
    kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
    kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
Hi to all,
I was trying to use the RestClusterClient to submit my job to the Flink cluster.
However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
This is the current client code I'm testing:

public static void main(String[] args) throws MalformedURLException {
    final Configuration flinkConf = new Configuration();
    flinkConf.set(RestOptions.ADDRESS, "localhost");
    flinkConf.set(RestOptions.PORT, 8081);

    final File jarFile = new File("/tmp/job-bundle.jar");
    final String jobClass = "it.flink.MyJob";

    try {
      final RestClusterClient<StandaloneClusterId> client =
          new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());

      final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
          .setJarFile(jarFile)//
          // .setUserClassPaths(userClassPaths)
          .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
          .build();

      final JobGraph jobGraph =
          PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);

      final DetachedJobExecutionResult jobExecutionResult =
          client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();

      System.out.println(jobExecutionResult.getJobID());
    } catch (Exception ex) {
      ex.printStackTrace();
      System.exit(1);
    }
}

Best,
Flavio




Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Kostas Kloudas-5
Hi all,

I will have a look in the whole stack trace in a bit.

@Chesnay Schepler I think that we are setting the correct classloader
during jobgraph creation [1]. Is that what you mean?

Cheers,
Kostas

[1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122

On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
<[hidden email]> wrote:

>
> Always the same problem.
>
> Caused by: java.lang.ClassNotFoundException: it.test.XXX
> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
> ... 10 more
>
> I've also tried with
>
>     flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>
> but nothing changes.
>
> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>
>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>
>> @klou This seems like wrong behavior?
>>
>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>
>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>> try {
>>    Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>    // do tstuff
>> } finally {
>>    Thread.currentThread().setContextClassLoader(contextClassLoader);
>> }
>>
>>
>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>
>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>
>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>
>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>
>>> Classpath: [file:/tmp/job-bundle.jar]
>>> ...
>>>
>>> Best,
>>> Flavio
>>>
>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>
>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>
>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>
>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>
>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>
>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>
>>>> The code of getBatchEnv is:
>>>>
>>>> @Deprecated
>>>>   public static BatchEnv getBatchEnv() {
>>>>     // TODO use the following when ready to convert from/to datastream
>>>>     // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>     BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>     customizeEnv(ret);
>>>>     return new BatchEnv(env, ret);
>>>>   }
>>>>
>>>>   private static void customizeEnv(TableEnvironment ret) {
>>>>     final Configuration conf = ret.getConfig().getConfiguration();
>>>>     // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>     conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>     conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>     // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>     // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>     // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>     // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>     conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>     conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>     conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>     conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>     conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>     final List<String> kryoSerializers = new ArrayList<>();
>>>>     kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>     kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>     kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>     conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>
>>>>   }
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>
>>>> System.out: (none)
>>>>
>>>> System.err: (none)
>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>> ... 3 more
>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>> ... 13 more
>>>>
>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>
>>>>> Hi Flavio,
>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>
>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>
>>>>>> Hi to all,
>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>> This is the current client code I'm testing:
>>>>>>
>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>     final Configuration flinkConf = new Configuration();
>>>>>>     flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>     flinkConf.set(RestOptions.PORT, 8081);
>>>>>>
>>>>>>     final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>     final String jobClass = "it.flink.MyJob";
>>>>>>
>>>>>>     try {
>>>>>>       final RestClusterClient<StandaloneClusterId> client =
>>>>>>           new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>
>>>>>>       final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>           .setJarFile(jarFile)//
>>>>>>           // .setUserClassPaths(userClassPaths)
>>>>>>           .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>           .build();
>>>>>>
>>>>>>       final JobGraph jobGraph =
>>>>>>           PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>
>>>>>>       final DetachedJobExecutionResult jobExecutionResult =
>>>>>>           client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>
>>>>>>       System.out.println(jobExecutionResult.getJobID());
>>>>>>     } catch (Exception ex) {
>>>>>>       ex.printStackTrace();
>>>>>>       System.exit(1);
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>
>>>>
>>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:

> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>

Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Flavio Pompermaier
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Flavio Pompermaier
Nothing to do also with IntelliJ..do you have any sample project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to build package programs and introduce classpath problems..) but I have 2 questions:
  • I remember that when submitting jobs from REST there's no way to detect failures in the job creation (like missing classes, classpath problems, etc). Am I wrong?
  • I'd like to monitor the progress of my batch job (for example I can count the number of completed vertices wrt the total count of vertices). Is there any suggested way to do that apart from polling?
Best,
Flavio

On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <[hidden email]> wrote:
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler
1) the job still reaches a failed state, which you can poll for, see 2)
2) polling is your only way.

What do you mean with "REST only client"? Do you mean a plain http client, not something that Flink provides?

On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
Nothing to do also with IntelliJ..do you have any sample project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to build package programs and introduce classpath problems..) but I have 2 questions:
  • I remember that when submitting jobs from REST there's no way to detect failures in the job creation (like missing classes, classpath problems, etc). Am I wrong?
  • I'd like to monitor the progress of my batch job (for example I can count the number of completed vertices wrt the total count of vertices). Is there any suggested way to do that apart from polling?
Best,
Flavio

On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <[hidden email]> wrote:
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>


Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler
To clarify, if the job creation fails on the JM side, in 1.11 the job submission will fail, in 1.12 it will succeed but the job will be in a failed state.

On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
1) the job still reaches a failed state, which you can poll for, see 2)
2) polling is your only way.

What do you mean with "REST only client"? Do you mean a plain http client, not something that Flink provides?

On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
Nothing to do also with IntelliJ..do you have any sample project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to build package programs and introduce classpath problems..) but I have 2 questions:
  • I remember that when submitting jobs from REST there's no way to detect failures in the job creation (like missing classes, classpath problems, etc). Am I wrong?
  • I'd like to monitor the progress of my batch job (for example I can count the number of completed vertices wrt the total count of vertices). Is there any suggested way to do that apart from polling?
Best,
Flavio

On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <[hidden email]> wrote:
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>



Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Flavio Pompermaier
For "REST only client" I mean using only the REST API to interact with the Flink cluster, i.e. without creating any PackagedProgram and thus incurring into classpath problems.
I've implemented a running job server that was using the REST API to upload the job jar and execute the run command but then I gave up because I was not able to run any code after env.execute..so I ended up using SSH to the remote server and using the CLI client. This has the limitation of not being able to get the job id and monitor the job status or get back exceptions when deploying the job.
So now I was trying to explore this new way of submitting the job (that computes the jobGraph on the client side and submit it to the cluster).



On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler <[hidden email]> wrote:
To clarify, if the job creation fails on the JM side, in 1.11 the job submission will fail, in 1.12 it will succeed but the job will be in a failed state.

On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
1) the job still reaches a failed state, which you can poll for, see 2)
2) polling is your only way.

What do you mean with "REST only client"? Do you mean a plain http client, not something that Flink provides?

On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
Nothing to do also with IntelliJ..do you have any sample project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to build package programs and introduce classpath problems..) but I have 2 questions:
  • I remember that when submitting jobs from REST there's no way to detect failures in the job creation (like missing classes, classpath problems, etc). Am I wrong?
  • I'd like to monitor the progress of my batch job (for example I can count the number of completed vertices wrt the total count of vertices). Is there any suggested way to do that apart from polling?
Best,
Flavio

On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <[hidden email]> wrote:
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>



Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler
If you aren't setting up the classpath correctly then you cannot create a JobGraph, and cannot use the REST API (outside of uploading jars).
In other words, you _will_ have to solve this issue, one way or another.

FYI, I just tried your code to submit a WordCount jar to a cluster (the one in the distribution), and it worked flawlessly. Please triple check your packaging and class references.

On 10/30/2020 10:48 AM, Flavio Pompermaier wrote:
For "REST only client" I mean using only the REST API to interact with the Flink cluster, i.e. without creating any PackagedProgram and thus incurring into classpath problems.
I've implemented a running job server that was using the REST API to upload the job jar and execute the run command but then I gave up because I was not able to run any code after env.execute..so I ended up using SSH to the remote server and using the CLI client. This has the limitation of not being able to get the job id and monitor the job status or get back exceptions when deploying the job.
So now I was trying to explore this new way of submitting the job (that computes the jobGraph on the client side and submit it to the cluster).



On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler <[hidden email]> wrote:
To clarify, if the job creation fails on the JM side, in 1.11 the job submission will fail, in 1.12 it will succeed but the job will be in a failed state.

On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
1) the job still reaches a failed state, which you can poll for, see 2)
2) polling is your only way.

What do you mean with "REST only client"? Do you mean a plain http client, not something that Flink provides?

On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
Nothing to do also with IntelliJ..do you have any sample project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to build package programs and introduce classpath problems..) but I have 2 questions:
  • I remember that when submitting jobs from REST there's no way to detect failures in the job creation (like missing classes, classpath problems, etc). Am I wrong?
  • I'd like to monitor the progress of my batch job (for example I can count the number of completed vertices wrt the total count of vertices). Is there any suggested way to do that apart from polling?
Best,
Flavio

On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <[hidden email]> wrote:
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>




Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Flavio Pompermaier
Yes, with the WordCount it works but that jar is not a "fat" jar (it does not include transitive dependencies). 
Actually I was able to use the REST API without creating the JobGraph, you just have to tell the run API the jar id, the main cluss to run and the optional parameters.
For this don't use any Flink official client, I use the Spring REST template and I've implemented the call of the services by myself.

On Fri, Oct 30, 2020 at 11:12 AM Chesnay Schepler <[hidden email]> wrote:
If you aren't setting up the classpath correctly then you cannot create a JobGraph, and cannot use the REST API (outside of uploading jars).
In other words, you _will_ have to solve this issue, one way or another.

FYI, I just tried your code to submit a WordCount jar to a cluster (the one in the distribution), and it worked flawlessly. Please triple check your packaging and class references.

On 10/30/2020 10:48 AM, Flavio Pompermaier wrote:
For "REST only client" I mean using only the REST API to interact with the Flink cluster, i.e. without creating any PackagedProgram and thus incurring into classpath problems.
I've implemented a running job server that was using the REST API to upload the job jar and execute the run command but then I gave up because I was not able to run any code after env.execute..so I ended up using SSH to the remote server and using the CLI client. This has the limitation of not being able to get the job id and monitor the job status or get back exceptions when deploying the job.
So now I was trying to explore this new way of submitting the job (that computes the jobGraph on the client side and submit it to the cluster).



On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler <[hidden email]> wrote:
To clarify, if the job creation fails on the JM side, in 1.11 the job submission will fail, in 1.12 it will succeed but the job will be in a failed state.

On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
1) the job still reaches a failed state, which you can poll for, see 2)
2) polling is your only way.

What do you mean with "REST only client"? Do you mean a plain http client, not something that Flink provides?

On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
Nothing to do also with IntelliJ..do you have any sample project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to build package programs and introduce classpath problems..) but I have 2 questions:
  • I remember that when submitting jobs from REST there's no way to detect failures in the job creation (like missing classes, classpath problems, etc). Am I wrong?
  • I'd like to monitor the progress of my batch job (for example I can count the number of completed vertices wrt the total count of vertices). Is there any suggested way to do that apart from polling?
Best,
Flavio

On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <[hidden email]> wrote:
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>




Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler
It is irrelevant whether it contains transitive dependencies or not; that's a maven concept, not a classloading one.

The WordCount main class, which is only contained in that jar, could be found, so the classloading is working. If any other class that is supposed to be in jar cannot be found, then that means class is either not in the jar, or some other transitive dependency is missing from the jar. (class loading exceptions can a bit misleading at times, particularly when accessing transitive dependencies in static fields IIRC).

> Actually I was able to use the REST API without creating the JobGraph

I'm not debating that, and pointed that out myself.
> [without a job graph you] cannot use the REST API (outside of uploading jars)

On 10/30/2020 11:22 AM, Flavio Pompermaier wrote:
Yes, with the WordCount it works but that jar is not a "fat" jar (it does not include transitive dependencies). 
Actually I was able to use the REST API without creating the JobGraph, you just have to tell the run API the jar id, the main cluss to run and the optional parameters.
For this don't use any Flink official client, I use the Spring REST template and I've implemented the call of the services by myself.

On Fri, Oct 30, 2020 at 11:12 AM Chesnay Schepler <[hidden email]> wrote:
If you aren't setting up the classpath correctly then you cannot create a JobGraph, and cannot use the REST API (outside of uploading jars).
In other words, you _will_ have to solve this issue, one way or another.

FYI, I just tried your code to submit a WordCount jar to a cluster (the one in the distribution), and it worked flawlessly. Please triple check your packaging and class references.

On 10/30/2020 10:48 AM, Flavio Pompermaier wrote:
For "REST only client" I mean using only the REST API to interact with the Flink cluster, i.e. without creating any PackagedProgram and thus incurring into classpath problems.
I've implemented a running job server that was using the REST API to upload the job jar and execute the run command but then I gave up because I was not able to run any code after env.execute..so I ended up using SSH to the remote server and using the CLI client. This has the limitation of not being able to get the job id and monitor the job status or get back exceptions when deploying the job.
So now I was trying to explore this new way of submitting the job (that computes the jobGraph on the client side and submit it to the cluster).



On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler <[hidden email]> wrote:
To clarify, if the job creation fails on the JM side, in 1.11 the job submission will fail, in 1.12 it will succeed but the job will be in a failed state.

On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
1) the job still reaches a failed state, which you can poll for, see 2)
2) polling is your only way.

What do you mean with "REST only client"? Do you mean a plain http client, not something that Flink provides?

On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
Nothing to do also with IntelliJ..do you have any sample project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to build package programs and introduce classpath problems..) but I have 2 questions:
  • I remember that when submitting jobs from REST there's no way to detect failures in the job creation (like missing classes, classpath problems, etc). Am I wrong?
  • I'd like to monitor the progress of my batch job (for example I can count the number of completed vertices wrt the total count of vertices). Is there any suggested way to do that apart from polling?
Best,
Flavio

On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <[hidden email]> wrote:
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>





Reply | Threaded
Open this post in threaded view
|

Re: RestClusterClient and classpath

Chesnay Schepler
Can you give me more information on your packaging setup / project structure? Is "it.test.MyOb" a test class? Does the dependency containing this class have a "test" scope?

On 10/30/2020 11:34 AM, Chesnay Schepler wrote:
It is irrelevant whether it contains transitive dependencies or not; that's a maven concept, not a classloading one.

The WordCount main class, which is only contained in that jar, could be found, so the classloading is working. If any other class that is supposed to be in jar cannot be found, then that means class is either not in the jar, or some other transitive dependency is missing from the jar. (class loading exceptions can a bit misleading at times, particularly when accessing transitive dependencies in static fields IIRC).

> Actually I was able to use the REST API without creating the JobGraph

I'm not debating that, and pointed that out myself.
> [without a job graph you] cannot use the REST API (outside of uploading jars)

On 10/30/2020 11:22 AM, Flavio Pompermaier wrote:
Yes, with the WordCount it works but that jar is not a "fat" jar (it does not include transitive dependencies). 
Actually I was able to use the REST API without creating the JobGraph, you just have to tell the run API the jar id, the main cluss to run and the optional parameters.
For this don't use any Flink official client, I use the Spring REST template and I've implemented the call of the services by myself.

On Fri, Oct 30, 2020 at 11:12 AM Chesnay Schepler <[hidden email]> wrote:
If you aren't setting up the classpath correctly then you cannot create a JobGraph, and cannot use the REST API (outside of uploading jars).
In other words, you _will_ have to solve this issue, one way or another.

FYI, I just tried your code to submit a WordCount jar to a cluster (the one in the distribution), and it worked flawlessly. Please triple check your packaging and class references.

On 10/30/2020 10:48 AM, Flavio Pompermaier wrote:
For "REST only client" I mean using only the REST API to interact with the Flink cluster, i.e. without creating any PackagedProgram and thus incurring into classpath problems.
I've implemented a running job server that was using the REST API to upload the job jar and execute the run command but then I gave up because I was not able to run any code after env.execute..so I ended up using SSH to the remote server and using the CLI client. This has the limitation of not being able to get the job id and monitor the job status or get back exceptions when deploying the job.
So now I was trying to explore this new way of submitting the job (that computes the jobGraph on the client side and submit it to the cluster).



On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler <[hidden email]> wrote:
To clarify, if the job creation fails on the JM side, in 1.11 the job submission will fail, in 1.12 it will succeed but the job will be in a failed state.

On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
1) the job still reaches a failed state, which you can poll for, see 2)
2) polling is your only way.

What do you mean with "REST only client"? Do you mean a plain http client, not something that Flink provides?

On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
Nothing to do also with IntelliJ..do you have any sample project I can reuse to test the job submission to a cluster?
I can't really understand why the classes within the fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need to build package programs and introduce classpath problems..) but I have 2 questions:
  • I remember that when submitting jobs from REST there's no way to detect failures in the job creation (like missing classes, classpath problems, etc). Am I wrong?
  • I'd like to monitor the progress of my batch job (for example I can count the number of completed vertices wrt the total count of vertices). Is there any suggested way to do that apart from polling?
Best,
Flavio

On Wed, Oct 28, 2020 at 12:19 PM Flavio Pompermaier <[hidden email]> wrote:
I'm runnin the code from Eclipse, the jar exists and it contains the classes Flink is not finding..maybe I can try to use IntelliJ in the afternoon

On Wed, Oct 28, 2020 at 12:13 PM Chesnay Schepler <[hidden email]> wrote:
@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>






12