Re: RestClusterClient and classpath

Posted by Chesnay Schepler on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/RestClusterClient-and-classpath-tp38955p39027.html

@Kostas: Ah, I missed that.

@Flavio: the only alternative I can think your jar does not contain the
classes, or does not exist at all on the machine your application is run on.

On 10/28/2020 12:08 PM, Kostas Kloudas wrote:

> Hi all,
>
> I will have a look in the whole stack trace in a bit.
>
> @Chesnay Schepler I think that we are setting the correct classloader
> during jobgraph creation [1]. Is that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio Pompermaier
> <[hidden email]> wrote:
>> Always the same problem.
>>
>> Caused by: java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>      flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER, "parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay Schepler <[hidden email]> wrote:
>>> hmm..it appears as if PackagedProgramUtils#createJobGraph does some things outside the usercode classlodaer (getPipelineFromProgram()), specifically the call to the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the meantime is wrap the call to createJobGraph like this:
>>>
>>> final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
>>> try {
>>>     Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>>     // do tstuff
>>> } finally {
>>>     Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio Pompermaier wrote:
>>>
>>> Any help here?  How can I understand why the classes inside the jar are not found when creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio Pompermaier <[hidden email]> wrote:
>>>> In the logs I see that the jar is the classpath (I'm trying to debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler <[hidden email]> wrote:
>>>>> * your JobExecutor is _not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on the client before you even hit the RestClusterClient, so I assume that either your jar is not packaged correctly or you your JobExecutor is putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class I'm trying to use as a client towards the Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>>    public static BatchEnv getBatchEnv() {
>>>>>      // TODO use the following when ready to convert from/to datastream
>>>>>      // return getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>>>>      BatchTableEnvironment ret = BatchTableEnvironment.create(env);
>>>>>      customizeEnv(ret);
>>>>>      return new BatchEnv(env, ret);
>>>>>    }
>>>>>
>>>>>    private static void customizeEnv(TableEnvironment ret) {
>>>>>      final Configuration conf = ret.getConfig().getConfiguration();
>>>>>      // conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
>>>>>      conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
>>>>>      conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
>>>>>      // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); //NOSONAR
>>>>>      // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.4f);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 * 2);//NOSONAR
>>>>>      // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 * 2);// NOSONAR
>>>>>      conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
>>>>>      conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
>>>>>      conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// NOSONAR
>>>>>      final List<String> kryoSerializers = new ArrayList<>();
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, JodaDateTimeSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, TBaseSerializer.class));
>>>>>      kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, TBaseSerializer.class));
>>>>>      conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>>>>>
>>>>>    }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger <[hidden email]> wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are seeing? I'm wondering if the error happens on the client or server side (among other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier <[hidden email]> wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient to submit my job to the Flink cluster.
>>>>>>> However when I submit the job Flink cannot find the classes contained in the "fat" jar..what should I do? Am I missing something in my code?
>>>>>>> This is the current client code I'm testing:
>>>>>>>
>>>>>>> public static void main(String[] args) throws MalformedURLException {
>>>>>>>      final Configuration flinkConf = new Configuration();
>>>>>>>      flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>      flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>>      final File jarFile = new File("/tmp/job-bundle.jar");
>>>>>>>      final String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>>      try {
>>>>>>>        final RestClusterClient<StandaloneClusterId> client =
>>>>>>>            new RestClusterClient<>(flinkConf, StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>>        final PackagedProgram packagedProgram = PackagedProgram.newBuilder()//
>>>>>>>            .setJarFile(jarFile)//
>>>>>>>            // .setUserClassPaths(userClassPaths)
>>>>>>>            .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>            .build();
>>>>>>>
>>>>>>>        final JobGraph jobGraph =
>>>>>>>            PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf, 1, true);
>>>>>>>
>>>>>>>        final DetachedJobExecutionResult jobExecutionResult =
>>>>>>>            client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>        System.out.println(jobExecutionResult.getJobID());
>>>>>>>      } catch (Exception ex) {
>>>>>>>        ex.printStackTrace();
>>>>>>>        System.exit(1);
>>>>>>>      }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>