Re: RestClusterClient and classpath
Posted by
Flavio Pompermaier on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/RestClusterClient-and-classpath-tp38955p39083.html
For "REST only client" I mean using only the REST API to interact with the Flink cluster, i.e. without creating any PackagedProgram and thus incurring into classpath problems.
I've implemented a running job server that was using the REST API to upload the job jar and execute the run command but then I gave up because I was not able to run any code after env.execute..so I ended up using SSH to the remote server and using the CLI client. This has the limitation of not being able to get the job id and monitor the job status or get back exceptions when deploying the job.
So now I was trying to explore this new way of submitting the job (that computes the jobGraph on the client side and submit it to the cluster).
On Fri, Oct 30, 2020 at 10:32 AM Chesnay Schepler <
[hidden email]> wrote:
To clarify, if the job creation fails
on the JM side, in 1.11 the job submission will fail, in 1.12 it
will succeed but the job will be in a failed state.
On 10/30/2020 10:23 AM, Chesnay
Schepler wrote:
1) the job still reaches a failed
state, which you can poll for, see 2)
2) polling is your only way.
What do you mean with "REST only
client"? Do you mean a plain http client, not something that
Flink provides?
On 10/30/2020 10:02 AM, Flavio
Pompermaier wrote:
Nothing to do also with IntelliJ..do you have
any sample project I can reuse to test the job submission to
a cluster?
I can't really understand why the classes within the
fat jar are not found when generating the PackagedProgram.
Ideally, I'd prefer to use REST only client (so no need
to build package programs and introduce classpath
problems..) but I have 2 questions:
- I remember that when submitting jobs from REST
there's no way to detect failures in the job creation
(like missing classes, classpath problems, etc). Am I
wrong?
- I'd like to monitor the progress of my batch job
(for example I can count the number of completed
vertices wrt the total count of vertices). Is there
any suggested way to do that apart from polling?
Best,
Flavio
On Wed, Oct 28, 2020 at
12:19 PM Flavio Pompermaier <
[hidden email]>
wrote:
I'm runnin the code from Eclipse, the jar
exists and it contains the classes Flink is not
finding..maybe I can try to use IntelliJ in the
afternoon
On Wed, Oct 28, 2020
at 12:13 PM Chesnay Schepler <
[hidden email]>
wrote:
@Kostas: Ah, I
missed that.
@Flavio: the only alternative I can think your jar
does not contain the
classes, or does not exist at all on the machine
your application is run on.
On 10/28/2020 12:08 PM, Kostas Kloudas wrote:
> Hi all,
>
> I will have a look in the whole stack trace in
a bit.
>
> @Chesnay Schepler I think that we are setting
the correct classloader
> during jobgraph creation [1]. Is that what you
mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM Flavio
Pompermaier
> <[hidden email]>
wrote:
>> Always the same problem.
>>
>> Caused by:
java.lang.ClassNotFoundException: it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>
flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
"parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM Chesnay
Schepler <[hidden email]>
wrote:
>>> hmm..it appears as if
PackagedProgramUtils#createJobGraph does some things
outside the usercode classlodaer
(getPipelineFromProgram()), specifically the call to
the main method.
>>>
>>> @klou This seems like wrong behavior?
>>>
>>> @Flavio What you could try in the
meantime is wrap the call to createJobGraph like
this:
>>>
>>> final ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
>>> try {
>>>
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>> // do tstuff
>>> } finally {
>>>
Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM, Flavio
Pompermaier wrote:
>>>
>>> Any help here? How can I understand
why the classes inside the jar are not found when
creating the PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04 AM Flavio
Pompermaier <[hidden email]>
wrote:
>>>> In the logs I see that the jar is
the classpath (I'm trying to debug the program from
the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at 10:39 AM
Chesnay Schepler <[hidden email]>
wrote:
>>>>> * your JobExecutor is _not_
putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36 AM, Chesnay
Schepler wrote:
>>>>>
>>>>> Well it happens on the client
before you even hit the RestClusterClient, so I
assume that either your jar is not packaged
correctly or you your JobExecutor is putting it on
the classpath.
>>>>>
>>>>> On 10/27/2020 9:42 AM, Flavio
Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is
(org.apache.flink.client.cli.JobExecutor is my main
class I'm trying to use as a client towards the
Flink cluster - session mode).
>>>>> it/test/MyOb is within the fat
jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>> public static BatchEnv
getBatchEnv() {
>>>>> // TODO use the following
when ready to convert from/to datastream
>>>>> // return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>> ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
>>>>> BatchTableEnvironment ret
= BatchTableEnvironment.create(env);
>>>>> customizeEnv(ret);
>>>>> return new BatchEnv(env,
ret);
>>>>> }
>>>>>
>>>>> private static void
customizeEnv(TableEnvironment ret) {
>>>>> final Configuration conf =
ret.getConfig().getConfiguration();
>>>>> //
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
>>>>>
conf.setString(CoreOptions.TMP_DIRS,
FLINK_TEST_TMP_DIR);
>>>>>
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
>>>>> //
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
>>>>> //
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);// NOSONAR
>>>>>
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
>>>>>
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");//
NOSONAR
>>>>>
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");//
NOSONAR
>>>>>
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10
min");// NOSONAR
>>>>>
conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));// NOSONAR
>>>>> final List<String>
kryoSerializers = new ArrayList<>();
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
>>>>>
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);
>>>>>
>>>>> }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by:
java.lang.NoClassDefFoundError: it/test/MyOb
>>>>> at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>>> at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by:
java.lang.ClassNotFoundException: it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020 at 9:32 AM
Robert Metzger <[hidden email]>
wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full
stacktrace you are seeing? I'm wondering if the
error happens on the client or server side (among
other questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at
5:58 PM Flavio Pompermaier <[hidden email]>
wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the
RestClusterClient to submit my job to the Flink
cluster.
>>>>>>> However when I submit
the job Flink cannot find the classes contained in
the "fat" jar..what should I do? Am I missing
something in my code?
>>>>>>> This is the current
client code I'm testing:
>>>>>>>
>>>>>>> public static void
main(String[] args) throws MalformedURLException {
>>>>>>> final
Configuration flinkConf = new Configuration();
>>>>>>>
flinkConf.set(RestOptions.ADDRESS, "localhost");
>>>>>>>
flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>> final File jarFile
= new File("/tmp/job-bundle.jar");
>>>>>>> final String
jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>> try {
>>>>>>> final
RestClusterClient<StandaloneClusterId> client
=
>>>>>>> new
RestClusterClient<>(flinkConf,
StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>> final
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()//
>>>>>>>
.setJarFile(jarFile)//
>>>>>>> //
.setUserClassPaths(userClassPaths)
>>>>>>>
.setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>> .build();
>>>>>>>
>>>>>>> final JobGraph
jobGraph =
>>>>>>>
PackagedProgramUtils.createJobGraph(packagedProgram,
flinkConf, 1, true);
>>>>>>>
>>>>>>> final
DetachedJobExecutionResult jobExecutionResult =
>>>>>>>
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>
System.out.println(jobExecutionResult.getJobID());
>>>>>>> } catch (Exception
ex) {
>>>>>>>
ex.printStackTrace();
>>>>>>> System.exit(1);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>