|
You are Right Chesnay but I'm doing this stuff in parallel with other 2 things and I messed up the jar name, sorry for that. For the code after env.execute I'll try to use the new JobListener interface next days.. I hope it could be sufficient (I just have to call an external service to update the status of the job).
Best, Flavio On Fri, Oct 30, 2020 at 12:40 PM Chesnay Schepler < [hidden email]> wrote:
Yes, it is definitely way easier to upload&run jars instead
of submitting JobGraphs.
But I thought this was not viable for you because you cannot
execute anything after env.execute()? I believe this limitation
still exists. Or are you referring here to error-handling in case
env.execute() throws an exception (which I think should work)?
Finally, I have to point that I
already entertained the possibility of the jar not being
packaged correctly twice in this thread, 2 and 3 days ago
respectively. We could've saved some time here had you checked
whether the jar actually contains the class.
On 10/30/2020 12:24 PM, Flavio
Pompermaier wrote:
I just discovered that I was using the "slim" jar
instead of the "fat" one...sorry. Now I'm able to successfully
run the program on the remote cluster.
However, the fact of generating the job graph on the
client side it's something I really don't like at allbecause
it requires access both to flink jdist and env variables
(such as hadoop ones) and the user jar, which is not really
neat to me.
But if I understood correctly in Flink 1.12 I'll be able
to use only the Job Manager REST API to run the job (since
the limitation of job submission failure could be handled),
is it correct?
Thanks for the support,
Flavio
On Fri, Oct 30, 2020 at
11:37 AM Chesnay Schepler < [hidden email]>
wrote:
Can you give me more information on your packaging
setup / project structure? Is "it.test.MyOb" a test
class? Does the dependency containing this class have a
"test" scope?
On 10/30/2020 11:34 AM, Chesnay Schepler wrote:
It is irrelevant whether it contains transitive
dependencies or not; that's a maven concept, not a
classloading one.
The WordCount main class, which is only contained
in that jar, could be found, so the classloading is
working. If any other class that is supposed to be
in jar cannot be found, then that means class is
either not in the jar, or some other transitive
dependency is missing from the jar. (class loading
exceptions can a bit misleading at times,
particularly when accessing transitive dependencies
in static fields IIRC).
> Actually I was able to use the REST API
without creating the JobGraph
I'm not debating that, and pointed that out
myself.
> [without a job graph you] cannot use the
REST API (outside of uploading jars)
On 10/30/2020 11:22 AM, Flavio Pompermaier wrote:
Yes, with the WordCount it works but
that jar is not a "fat" jar (it does not include
transitive dependencies).
Actually I was able to use the REST API
without creating the JobGraph, you just have to
tell the run API the jar id, the main cluss to
run and the optional parameters.
For this don't use any Flink official client,
I use the Spring REST template and I've
implemented the call of the services by myself.
On Fri, Oct 30,
2020 at 11:12 AM Chesnay Schepler < [hidden email]>
wrote:
If you aren't setting up the classpath
correctly then you cannot create a JobGraph,
and cannot use the REST API (outside of
uploading jars).
In other words, you _will_ have to solve
this issue, one way or another.
FYI, I just tried your code to submit a
WordCount jar to a cluster (the one in the
distribution), and it worked flawlessly.
Please triple check your packaging and class
references.
On 10/30/2020 10:48 AM, Flavio
Pompermaier wrote:
For "REST only client" I
mean using only the REST API to interact
with the Flink cluster, i.e. without
creating any PackagedProgram and thus
incurring into classpath problems.
I've implemented a running job
server that was using the REST API to
upload the job jar and execute the run
command but then I gave up because I
was not able to run any code after
env.execute..so I ended up using SSH
to the remote server and using the CLI
client. This has the limitation of not
being able to get the job id and
monitor the job status or get back
exceptions when deploying the job.
So now I was trying to explore this
new way of submitting the job (that
computes the jobGraph on the client
side and submit it to the cluster).
On
Fri, Oct 30, 2020 at 10:32 AM Chesnay
Schepler < [hidden email]>
wrote:
To clarify, if the job creation
fails on the JM side, in 1.11 the
job submission will fail, in 1.12
it will succeed but the job will
be in a failed state.
On 10/30/2020 10:23 AM, Chesnay
Schepler wrote:
1) the job still reaches a
failed state, which you can poll
for, see 2)
2) polling is your only way.
What do you mean with "REST
only client"? Do you mean a
plain http client, not something
that Flink provides?
On 10/30/2020 10:02 AM,
Flavio Pompermaier wrote:
Nothing to do
also with IntelliJ..do you
have any sample project I
can reuse to test the job
submission to a cluster?
I can't really
understand why the classes
within the fat jar are not
found when generating the
PackagedProgram.
Ideally, I'd prefer to
use REST only client (so
no need to build package
programs and introduce
classpath problems..) but
I have 2 questions:
- I remember that when
submitting jobs from
REST there's no way to
detect failures in the
job creation (like
missing classes,
classpath problems,
etc). Am I wrong?
- I'd like to monitor
the progress of my
batch job (for example
I can count the number
of completed vertices
wrt the total count of
vertices). Is there
any suggested way to
do that apart from
polling?
Best,
Flavio
On Wed,
Oct 28, 2020 at 12:19 PM
Flavio Pompermaier < [hidden email]>
wrote:
I'm
runnin the code from
Eclipse, the jar
exists and it contains
the classes Flink is
not finding..maybe I
can try to use
IntelliJ in the
afternoon
On
Wed, Oct 28, 2020 at
12:13 PM Chesnay
Schepler < [hidden email]>
wrote:
@Kostas:
Ah, I missed that.
@Flavio: the only
alternative I can
think your jar does
not contain the
classes, or does not
exist at all on the
machine your
application is run
on.
On 10/28/2020 12:08
PM, Kostas Kloudas
wrote:
> Hi all,
>
> I will have a
look in the whole
stack trace in a
bit.
>
> @Chesnay
Schepler I think
that we are setting
the correct
classloader
> during jobgraph
creation [1]. Is
that what you mean?
>
> Cheers,
> Kostas
>
> [1] https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java#L122
>
> On Wed, Oct 28,
2020 at 11:02 AM
Flavio Pompermaier
> <[hidden email]>
wrote:
>> Always the
same problem.
>>
>> Caused by:
java.lang.ClassNotFoundException: it.test.XXX
>> at
java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>> at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>> at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>> at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>> ... 10 more
>>
>> I've also
tried with
>>
>>
flinkConf.set(CoreOptions.CLASSLOADER_RESOLVE_ORDER,
"parent-first");
>>
>> but nothing
changes.
>>
>> On Wed, Oct
28, 2020 at 10:34 AM
Chesnay Schepler
<[hidden email]>
wrote:
>>> hmm..it
appears as if
PackagedProgramUtils#createJobGraph
does some things
outside the usercode
classlodaer
(getPipelineFromProgram()),
specifically the
call to the main
method.
>>>
>>> @klou
This seems like
wrong behavior?
>>>
>>> @Flavio
What you could try
in the meantime is
wrap the call to
createJobGraph like
this:
>>>
>>> final
ClassLoader
contextClassLoader =
Thread.currentThread().getContextClassLoader();
>>> try {
>>>
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
>>> //
do tstuff
>>> }
finally {
>>>
Thread.currentThread().setContextClassLoader(contextClassLoader);
>>> }
>>>
>>>
>>> On
10/28/2020 10:12 AM,
Flavio Pompermaier
wrote:
>>>
>>> Any
help here? How can
I understand why the
classes inside the
jar are not found
when creating the
PackagedProgram?
>>>
>>> On Tue,
Oct 27, 2020 at
11:04 AM Flavio
Pompermaier <[hidden email]>
wrote:
>>>> In
the logs I see that
the jar is the
classpath (I'm
trying to debug the
program from the
IDE)..isn'it?
>>>>
>>>>
Classpath: [file:/tmp/job-bundle.jar]
>>>> ...
>>>>
>>>>
Best,
>>>>
Flavio
>>>>
>>>> On
Tue, Oct 27, 2020 at
10:39 AM Chesnay
Schepler <[hidden email]>
wrote:
>>>>>
* your JobExecutor
is _not_ putting it
on the classpath.
>>>>>
>>>>>
On 10/27/2020 10:36
AM, Chesnay Schepler
wrote:
>>>>>
>>>>>
Well it happens on
the client before
you even hit the
RestClusterClient,
so I assume that
either your jar is
not packaged
correctly or you
your JobExecutor is
putting it on the
classpath.
>>>>>
>>>>>
On 10/27/2020 9:42
AM, Flavio
Pompermaier wrote:
>>>>>
>>>>>
Sure. Here it is
(org.apache.flink.client.cli.JobExecutor
is my main class I'm
trying to use as a
client towards the
Flink cluster -
session mode).
>>>>>
it/test/MyOb is
within the fat jar
(/tmp/job-bundle.jar).
>>>>>
>>>>>
The code of
getBatchEnv is:
>>>>>
>>>>>
@Deprecated
>>>>> public static BatchEnv getBatchEnv() {
>>>>> // TODO use the following when ready to
convert from/to
datastream
>>>>> // return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
>>>>> ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
>>>>> BatchTableEnvironment ret =
BatchTableEnvironment.create(env);
>>>>> customizeEnv(ret);
>>>>> return new BatchEnv(env, ret);
>>>>> }
>>>>>
>>>>> private static void
customizeEnv(TableEnvironment
ret) {
>>>>> final Configuration conf =
ret.getConfig().getConfiguration();
>>>>> //
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
>>>>> conf.setString(CoreOptions.TMP_DIRS,
FLINK_TEST_TMP_DIR);
>>>>>
conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
FLINK_TEST_TMP_DIR);
>>>>> //
conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
4); //NOSONAR
>>>>> //
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX,
32768 * 2);//NOSONAR
>>>>> //
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
32768 * 2);//
NOSONAR
>>>>>
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT,
0);// NOSONAR
>>>>> conf.setString(AkkaOptions.ASK_TIMEOUT, "10
min");// NOSONAR
>>>>> conf.setString(AkkaOptions.TCP_TIMEOUT, "10
min");// NOSONAR
>>>>> conf.setString(AkkaOptions.STARTUP_TIMEOUT,
"10 min");// NOSONAR
>>>>> conf.set(ClientOptions.CLIENT_TIMEOUT,
Duration.ofMinutes(10));//
NOSONAR
>>>>> final List<String> kryoSerializers = new
ArrayList<>();
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
>>>>>
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
>>>>>
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS,
kryoSerializers);
>>>>>
>>>>> }
>>>>>
>>>>>
Classpath: [file:/tmp/job-bundle.jar]
>>>>>
>>>>>
System.out: (none)
>>>>>
>>>>>
System.err: (none)
>>>>>
at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
>>>>>
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
>>>>>
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
>>>>>
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
>>>>>
at
org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
>>>>>
Caused by:
java.lang.NoClassDefFoundError:
it/test/MyOb
>>>>>
at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
>>>>>
at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
>>>>>
at it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
>>>>>
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
>>>>>
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>
at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>>
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
>>>>>
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>>>>
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
>>>>>
... 3 more
>>>>>
Caused by:
java.lang.ClassNotFoundException: it/test/MyOb
>>>>>
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
>>>>>
at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
>>>>>
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
>>>>>
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>
at
java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>>>>
... 13 more
>>>>>
>>>>>
On Tue, Oct 27, 2020
at 9:32 AM Robert
Metzger <[hidden email]>
wrote:
>>>>>> Hi Flavio,
>>>>>> can you share the full stacktrace you are
seeing? I'm
wondering if the
error happens on the
client or server
side (among other
questions I have).
>>>>>>
>>>>>> On Mon, Oct 26, 2020 at 5:58 PM Flavio
Pompermaier <[hidden email]>
wrote:
>>>>>>> Hi to all,
>>>>>>> I was trying to use the RestClusterClient
to submit my job to
the Flink cluster.
>>>>>>> However when I submit the job Flink cannot
find the classes
contained in the
"fat" jar..what
should I do? Am I
missing something in
my code?
>>>>>>> This is the current client code I'm
testing:
>>>>>>>
>>>>>>> public static void main(String[] args)
throws
MalformedURLException
{
>>>>>>> final Configuration flinkConf = new
Configuration();
>>>>>>> flinkConf.set(RestOptions.ADDRESS,
"localhost");
>>>>>>> flinkConf.set(RestOptions.PORT, 8081);
>>>>>>>
>>>>>>> final File jarFile = new
File("/tmp/job-bundle.jar");
>>>>>>> final String jobClass =
"it.flink.MyJob";
>>>>>>>
>>>>>>> try {
>>>>>>> final
RestClusterClient<StandaloneClusterId> client =
>>>>>>> new
RestClusterClient<>(flinkConf,
StandaloneClusterId.getInstance());
>>>>>>>
>>>>>>> final PackagedProgram
packagedProgram =
PackagedProgram.newBuilder()//
>>>>>>> .setJarFile(jarFile)//
>>>>>>> //
.setUserClassPaths(userClassPaths)
>>>>>>>
.setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>>>>>>> .build();
>>>>>>>
>>>>>>> final JobGraph jobGraph =
>>>>>>>
PackagedProgramUtils.createJobGraph(packagedProgram,
flinkConf, 1, true);
>>>>>>>
>>>>>>> final DetachedJobExecutionResult
jobExecutionResult =
>>>>>>>
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>>>>>>>
>>>>>>>
System.out.println(jobExecutionResult.getJobID());
>>>>>>> } catch (Exception ex) {
>>>>>>> ex.printStackTrace();
>>>>>>> System.exit(1);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>
>>>>>
|