Re: RestClusterClient and classpath
Posted by
Flavio Pompermaier on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/RestClusterClient-and-classpath-tp38955p39086.html
Yes, with the WordCount it works but that jar is not a "fat" jar (it does not include transitive dependencies).
Actually I was able to use the REST API without creating the JobGraph, you just have to tell the run API the jar id, the main cluss to run and the optional parameters.
For this don't use any Flink official client, I use the Spring REST template and I've implemented the call of the services by myself.
On Fri, Oct 30, 2020 at 11:12 AM Chesnay Schepler <
[hidden email]> wrote:
If you aren't setting up the classpath
correctly then you cannot create a JobGraph, and cannot use the
REST API (outside of uploading jars).
In other words, you _will_ have to
solve this issue, one way or another.
FYI, I just tried your code to submit a
WordCount jar to a cluster (the one in the distribution), and it
worked flawlessly. Please triple check your packaging and class
references.
On 10/30/2020 10:48 AM, Flavio
Pompermaier wrote:
For "REST only client" I mean using only the REST
API to interact with the Flink cluster, i.e. without creating
any PackagedProgram and thus incurring into classpath
problems.
I've implemented a running job server that was using the
REST API to upload the job jar and execute the run command
but then I gave up because I was not able to run any code
after env.execute..so I ended up using SSH to the remote
server and using the CLI client. This has the limitation of
not being able to get the job id and monitor the job
status or get back exceptions when deploying the job.
So now I was trying to explore this new way of submitting
the job (that computes the jobGraph on the client side and
submit it to the cluster).
On Fri, Oct 30, 2020 at
10:32 AM Chesnay Schepler <
[hidden email]>
wrote:
To clarify, if the job creation fails on the JM side,
in 1.11 the job submission will fail, in 1.12 it will
succeed but the job will be in a failed state.
On 10/30/2020 10:23 AM, Chesnay Schepler wrote:
1) the job still reaches a failed state, which you
can poll for, see 2)
2) polling is your only way.
What do you mean with "REST only client"? Do you
mean a plain http client, not something that Flink
provides?
On 10/30/2020 10:02 AM, Flavio Pompermaier wrote:
Nothing to do also with IntelliJ..do
you have any sample project I can reuse to test
the job submission to a cluster?
I can't really understand why the classes
within the fat jar are not found when generating
the PackagedProgram.
Ideally, I'd prefer to use REST only client
(so no need to build package programs and
introduce classpath problems..) but I have 2
questions:
- I remember that when submitting jobs from
REST there's no way to detect failures in
the job creation (like missing classes,
classpath problems, etc). Am I wrong?
- I'd like to monitor the progress of my
batch job (for example I can count the
number of completed vertices wrt the total
count of vertices). Is there any suggested
way to do that apart from polling?
Best,
Flavio
On Wed, Oct 28,
2020 at 12:19 PM Flavio Pompermaier <
[hidden email]>
wrote:
I'm runnin the code from
Eclipse, the jar exists and it contains the
classes Flink is not finding..maybe I can
try to use IntelliJ in the afternoon
On Wed,
Oct 28, 2020 at 12:13 PM Chesnay Schepler
<
[hidden email]>
wrote:
@Kostas:
Ah, I missed that.
@Flavio: the only alternative I can think
your jar does not contain the
classes, or does not exist at all on the
machine your application is run on.
On 10/28/2020 12:08 PM, Kostas Kloudas
wrote:
> Hi all,
>
> I will have a look in the whole stack
trace in a bit.
>
> @Chesnay Schepler I think that we are
setting the correct classloader
> during jobgraph creation [1]. Is that
what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28, 2020 at 11:02 AM
Flavio Pompermaier
> <[hidden email]>
wrote:
>> Always the same problem.
>>
>> Caused by:
java.lang.ClassNotFoundException:
it.test.XXX
>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also tried with
>>
>>
flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
"parent-first");
>>
>> but nothing changes.
>>
>> On Wed, Oct 28, 2020 at 10:34 AM
Chesnay Schepler <[hidden email]>
wrote:
>>> hmm..it appears as if
PackagedProgramUtils#createJobGraph does
some things outside the usercode
classlodaer (getPipelineFromProgram()),
specifically the call to the main method.
>>>
>>> @klou This seems like wrong
behavior?
>>>
>>> @Flavio What you could try in
the meantime is wrap the call to
createJobGraph like this:
>>>
>>> final ClassLoader
contextClassLoader =
Thread.currentThread().getContextClassLoader();
>>> try {
>>>
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>> // do tstuff
>>> } finally {
>>>
Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On 10/28/2020 10:12 AM,
Flavio Pompermaier wrote:
>>>
>>> Any help here? How can I
understand why the classes inside the jar
are not found when creating the
PackagedProgram?
>>>
>>> On Tue, Oct 27, 2020 at 11:04
AM Flavio Pompermaier <[hidden email]>
wrote:
>>>> In the logs I see that
the jar is the classpath (I'm trying to
debug the program from the IDE)..isn'it?
>>>>
>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> On Tue, Oct 27, 2020 at
10:39 AM Chesnay Schepler <[hidden email]>
wrote:
>>>>> * your JobExecutor is
_not_ putting it on the classpath.
>>>>>
>>>>> On 10/27/2020 10:36
AM, Chesnay Schepler wrote:
>>>>>
>>>>> Well it happens on
the client before you even hit the
RestClusterClient, so I assume that either
your jar is not packaged correctly or you
your JobExecutor is putting it on the
classpath.
>>>>>
>>>>> On 10/27/2020 9:42
AM, Flavio Pompermaier wrote:
>>>>>
>>>>> Sure. Here it is
(org.apache.flink.client.cli.JobExecutor
is my main class I'm trying to use as a
client towards the Flink cluster - session
mode).
>>>>> it/test/MyOb is
within the fat jar (/tmp/job-bundle.jar).
>>>>>
>>>>> The code of
getBatchEnv is:
>>>>>
>>>>> @Deprecated
>>>>> public static
BatchEnv getBatchEnv() {
>>>>> // TODO use the
following when ready to convert from/to
datastream
>>>>> // return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>>
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
>>>>>
BatchTableEnvironment ret =
BatchTableEnvironment.create(env);
>>>>>
customizeEnv(ret);
>>>>> return new
BatchEnv(env, ret);
>>>>> }
>>>>>
>>>>> private static
void customizeEnv(TableEnvironment ret) {
>>>>> final
Configuration conf =
ret.getConfig().getConfiguration();
>>>>> //
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
>>>>>
conf.setString(CoreOptions.TMP_DIRS,
FLINK_TEST_TMP_DIR);
>>>>>
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
>>>>> //
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
>>>>> //
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);// NOSONAR
>>>>>
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
>>>>>
conf.setString(AkkaOptions.ASK_TIMEOUT,
"10 min");// NOSONAR
>>>>>
conf.setString(AkkaOptions.TCP_TIMEOUT,
"10 min");// NOSONAR
>>>>>
conf.setString(AkkaOptions.STARTUP_TIMEOUT,
"10 min");// NOSONAR
>>>>>
conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));// NOSONAR
>>>>> final
List<String> kryoSerializers = new
ArrayList<>();
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
>>>>>
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);
>>>>>
>>>>> }
>>>>>
>>>>> Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>> System.out: (none)
>>>>>
>>>>> System.err: (none)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>> at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>> Caused by:
java.lang.NoClassDefFoundError:
it/test/MyOb
>>>>> at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>> at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>> at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>> at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>>> at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>> at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>> at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>> ... 3 more
>>>>> Caused by:
java.lang.ClassNotFoundException:
it/test/MyOb
>>>>> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>> ... 13 more
>>>>>
>>>>> On Tue, Oct 27, 2020
at 9:32 AM Robert Metzger <[hidden email]>
wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the
full stacktrace you are seeing? I'm
wondering if the error happens on the
client or server side (among other
questions I have).
>>>>>>
>>>>>> On Mon, Oct 26,
2020 at 5:58 PM Flavio Pompermaier <[hidden email]>
wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying
to use the RestClusterClient to submit my
job to the Flink cluster.
>>>>>>> However when
I submit the job Flink cannot find the
classes contained in the "fat" jar..what
should I do? Am I missing something in my
code?
>>>>>>> This is the
current client code I'm testing:
>>>>>>>
>>>>>>> public static
void main(String[] args) throws
MalformedURLException {
>>>>>>> final
Configuration flinkConf = new
Configuration();
>>>>>>>
flinkConf.set(RestOptions.ADDRESS,
"localhost");
>>>>>>>
flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>> final
File jarFile = new
File("/tmp/job-bundle.jar");
>>>>>>> final
String jobClass = "it.flink.MyJob";
>>>>>>>
>>>>>>> try {
>>>>>>> final
RestClusterClient<StandaloneClusterId> client =
>>>>>>>
new RestClusterClient<>(flinkConf,
StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>> final
PackagedProgram packagedProgram =
PackagedProgram.newBuilder()//
>>>>>>>
.setJarFile(jarFile)//
>>>>>>> //
.setUserClassPaths(userClassPaths)
>>>>>>>
.setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>>
.build();
>>>>>>>
>>>>>>> final
JobGraph jobGraph =
>>>>>>>
PackagedProgramUtils.createJobGraph(packagedProgram,
flinkConf, 1, true);
>>>>>>>
>>>>>>> final
DetachedJobExecutionResult
jobExecutionResult =
>>>>>>>
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>
System.out.println(jobExecutionResult.getJobID());
>>>>>>> } catch
(Exception ex) {
>>>>>>>
ex.printStackTrace();
>>>>>>>
System.exit(1);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>