How to submit a job with dependency jars by flink cli in Flink 1.4.2?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to submit a job with dependency jars by flink cli in Flink 1.4.2?

Joshua Fan
Hi,

I'd like to submit a job with dependency jars by flink run, but it failed.

Here is the script,

/usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
-m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
-c StreamExample \
-C file:/home/work/xxx/lib/commons-math3-3.5.jar \
-C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
...
xxx-1.0.jar

As described in https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage , "-C" means to provide the dependency jar.

After I execute the command, the job succeed to submit, but can not run in flink cluster on yarn. Exceptions is like below:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
ClassLoader info: URL ClassLoader:
    file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
    file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing)
    .......
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

It appears that the two dependency jar cannot be found in TaskManager, so I dig into the source code, from CliFrontend to PackagedProgram to ClusterClient to JobGraph. It seems like the dependency jars is put in classpath and userCodeClassLoader in PackagedProgram, but never upload to the BlobServer in JobGraph where the xxx-1.0.jar is uploaded.

Am I missing something? In Flink 1.4.2, dependency jar is not supported?

Hope someone can give me some hint.

Appreciate it very mush.


Yours Sincerely

Joshua




Reply | Threaded
Open this post in threaded view
|

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

Piotr Nowojski
Hi,

Are those paths:
    file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
    file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing)

accessible from the inside of your container? 

bin/flink run --help
(…)
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share)
. You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.

Other nit, maybe the problem is with single slash after “file:”. You have 
file:/home/...
While it might need to be

Piotrek

On 3 Aug 2018, at 13:03, Joshua Fan <[hidden email]> wrote:

Hi,

I'd like to submit a job with dependency jars by flink run, but it failed.

Here is the script,

/usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
-m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
-c StreamExample \
-C file:/home/work/xxx/lib/commons-math3-3.5.jar \
-C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
...
xxx-1.0.jar

As described in https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage , "-C" means to provide the dependency jar.

After I execute the command, the job succeed to submit, but can not run in flink cluster on yarn. Exceptions is like below:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
ClassLoader info: URL ClassLoader:
    file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
    file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing)
    .......
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

It appears that the two dependency jar cannot be found in TaskManager, so I dig into the source code, from CliFrontend to PackagedProgram to ClusterClient to JobGraph. It seems like the dependency jars is put in classpath and userCodeClassLoader in PackagedProgram, but never upload to the BlobServer in JobGraph where the xxx-1.0.jar is uploaded.

Am I missing something? In Flink 1.4.2, dependency jar is not supported?

Hope someone can give me some hint.

Appreciate it very mush.


Yours Sincerely

Joshua





Reply | Threaded
Open this post in threaded view
|

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

Paul Lam
In reply to this post by Joshua Fan


> 在 2018年8月3日,19:03,Joshua Fan <[hidden email]> 写道:
>
> Hi,
>
> I'd like to submit a job with dependency jars by flink run, but it failed.
>
> Here is the script,
>
> /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
> -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
> -c StreamExample \
> -C file:/home/work/xxx/lib/commons-math3-3.5.jar \
> -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
> ...
> xxx-1.0.jar
>
> As described in https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage , "-C" means to provide the dependency jar.
>
> After I execute the command, the job succeed to submit, but can not run in flink cluster on yarn. Exceptions is like below:
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
> ClassLoader info: URL ClassLoader:
>     file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
>     file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing)
>     .......
> Class not resolvable through given classloader.
> at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
>
> It appears that the two dependency jar cannot be found in TaskManager, so I dig into the source code, from CliFrontend to PackagedProgram to ClusterClient to JobGraph. It seems like the dependency jars is put in classpath and userCodeClassLoader in PackagedProgram, but never upload to the BlobServer in JobGraph where the xxx-1.0.jar is uploaded.
>
> Am I missing something? In Flink 1.4.2, dependency jar is not supported?
>
> Hope someone can give me some hint.
>
> Appreciate it very mush.
>
>
> Yours Sincerely
>
> Joshua
>
>
>

Hi Joshua,

I think what you’re looking for is `-yt` option, which is used for distributing a specified directory via YARN to the TaskManager nodes. And you can find its description in the Flink client by executing `bin/flink`.

> -yt,--yarnship <arg>                 Ship files in the specified directory (t for transfer)

Best Regards,
Paul Lam
Reply | Threaded
Open this post in threaded view
|

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

Piotr Nowojski
In reply to this post by Piotr Nowojski
Hi,

 -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)

I guess that you even got a warning in your log files:

LOG.warn("Ship directory is not a directory. Ignoring it.”);

I’m not sure, but maybe with `-yt` you do not even need to specify `-C`, just `-yt /home/work/xxx/lib/` should suffice:

Piotrek

On 3 Aug 2018, at 14:41, Joshua Fan <[hidden email]> wrote:

hi Piotr

I give up to use big c to do such a thing. Big c requires the value to be a java URL, but the java URL only supports  file,ftp,gopher,http,https,jar,mailto,netdoc. That's why I can not do it with a hdfs location.

For yt option, I think I should do something more.

Yours
Joshua

On Fri, Aug 3, 2018 at 8:11 PM, Joshua Fan <[hidden email]> wrote:
Hi Piotr

I just tried the yt option, like your suggestion, change -C  file:/home/work/xxx/lib/commons-math3-3.5.jar to -yt  file:/home/work/xxx/lib/commons-math3-3.5.jar, but it even fails to submit, reporting exception "Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)".

big c can submit the job but the job can not run in cluster on yarn, yt just can not submit.

I am trying to change the  "-C  file:/home/work/xxx/lib/commons-math3-3.5.jar" to  "-C  <a href="hdfs://namenode1/home/" class="">hdfs://namenode1/home/work/xxx/lib/commons-math3-3.5.jar", but Clifrontend error was caught.
I am still on it now, will report it later.

Yours
Joshua

On Fri, Aug 3, 2018 at 7:58 PM, Piotr Nowojski <[hidden email]> wrote:
Hi Joshua,

Please try (as Paul suggested) using:

     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)

I guess `-yt /home/work/xxx` should solve your problem :)

Piotrek

On 3 Aug 2018, at 13:54, Joshua Fan <[hidden email]> wrote:

Hi Piotr

Thank you for your advice. I submit the dependency jar from local machine, they does not exist in yarn container machine. Maybe I misunderstand the option big c, it can not do such a thing.

Joshua  

On Fri, Aug 3, 2018 at 7:17 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Are those paths:
    file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
    file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing)

accessible from the inside of your container? 

bin/flink run --help
(…)
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share)
. You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.

Other nit, maybe the problem is with single slash after “file:”. You have 
file:/home/...
While it might need to be

Piotrek

On 3 Aug 2018, at 13:03, Joshua Fan <[hidden email]> wrote:

Hi,

I'd like to submit a job with dependency jars by flink run, but it failed.

Here is the script,

/usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
-m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
-c StreamExample \
-C file:/home/work/xxx/lib/commons-math3-3.5.jar \
-C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
...
xxx-1.0.jar

As described in https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage , "-C" means to provide the dependency jar.

After I execute the command, the job succeed to submit, but can not run in flink cluster on yarn. Exceptions is like below:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
ClassLoader info: URL ClassLoader:
    file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
    file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing)
    .......
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

It appears that the two dependency jar cannot be found in TaskManager, so I dig into the source code, from CliFrontend to PackagedProgram to ClusterClient to JobGraph. It seems like the dependency jars is put in classpath and userCodeClassLoader in PackagedProgram, but never upload to the BlobServer in JobGraph where the xxx-1.0.jar is uploaded.

Am I missing something? In Flink 1.4.2, dependency jar is not supported?

Hope someone can give me some hint.

Appreciate it very mush.


Yours Sincerely

Joshua










Reply | Threaded
Open this post in threaded view
|

Re: How to submit a job with dependency jars by flink cli in Flink 1.4.2?

Piotr Nowojski
Hi,

I’m glad that you have found a solution to your problem :)

To shorten feedback you can/should test as much logic as possible using smaller unit tests and some small scale integration tests: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html . Usually there is no need for starting up full Flink cluster and submitting your WIP job during development. You can do such end to end tests only once before committing/pushing/merging/deploying. 

Piotrek 

On 6 Aug 2018, at 10:03, Joshua Fan <[hidden email]> wrote:

Hi Piotr

Thank you for your kindly suggestion.

Yes, there was surely a warning when a path like file:// is set. I later set the -yt to a directory, and the jars in the directory was uploaded to TM, but the flink run command failed to submit the job because of ClassNotFoundException.

I finally realize that flink just want the user to use a fat jar to submit the jar and its dependency but not a dynamic way to upload dependency when submitting.

It's right when I submit a job in production environment, but in test environment, users may change the business logic many times, they do not want to wait a long time(to make the fat jar using maven,to transfer it to a flink client node, to run it, I have to admit it is a long time.) to test it in flink.

It seems I have to find a way to shorten the time my users cost.

Yours Sincerely

Joshua

On Fri, Aug 3, 2018 at 9:08 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

 -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)

I guess that you even got a warning in your log files:

LOG.warn("Ship directory is not a directory. Ignoring it.”);

I’m not sure, but maybe with `-yt` you do not even need to specify `-C`, just `-yt /home/work/xxx/lib/` should suffice:

Piotrek


On 3 Aug 2018, at 14:41, Joshua Fan <[hidden email]> wrote:

hi Piotr

I give up to use big c to do such a thing. Big c requires the value to be a java URL, but the java URL only supports  file,ftp,gopher,http,https,jar,mailto,netdoc. That's why I can not do it with a hdfs location.

For yt option, I think I should do something more.

Yours
Joshua

On Fri, Aug 3, 2018 at 8:11 PM, Joshua Fan <[hidden email]> wrote:
Hi Piotr

I just tried the yt option, like your suggestion, change -C  file:/home/work/xxx/lib/commons-math3-3.5.jar to -yt  file:/home/work/xxx/lib/commons-math3-3.5.jar, but it even fails to submit, reporting exception "Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)".

big c can submit the job but the job can not run in cluster on yarn, yt just can not submit.

I am trying to change the  "-C  file:/home/work/xxx/lib/commons-math3-3.5.jar" to  "-C  hdfs://namenode1/home/work/xxx/lib/commons-math3-3.5.jar", but Clifrontend error was caught.
I am still on it now, will report it later.

Yours
Joshua

On Fri, Aug 3, 2018 at 7:58 PM, Piotr Nowojski <[hidden email]> wrote:
Hi Joshua,

Please try (as Paul suggested) using:

     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)

I guess `-yt /home/work/xxx` should solve your problem :)

Piotrek

On 3 Aug 2018, at 13:54, Joshua Fan <[hidden email]> wrote:

Hi Piotr

Thank you for your advice. I submit the dependency jar from local machine, they does not exist in yarn container machine. Maybe I misunderstand the option big c, it can not do such a thing.

Joshua  

On Fri, Aug 3, 2018 at 7:17 PM, Piotr Nowojski <[hidden email]> wrote:
Hi,

Are those paths:
    file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
    file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing)

accessible from the inside of your container? 

bin/flink run --help
(…)
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share)
. You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.

Other nit, maybe the problem is with single slash after “file:”. You have 
file:/home/...
While it might need to be

Piotrek

On 3 Aug 2018, at 13:03, Joshua Fan <[hidden email]> wrote:

Hi,

I'd like to submit a job with dependency jars by flink run, but it failed.

Here is the script,

/usr/bin/hadoop/software/flink-1.4.2/bin/flink run \
-m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \
-c StreamExample \
-C file:/home/work/xxx/lib/commons-math3-3.5.jar \
-C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \
...
xxx-1.0.jar

As described in https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage , "-C" means to provide the dependency jar.

After I execute the command, the job succeed to submit, but can not run in flink cluster on yarn. Exceptions is like below:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
ClassLoader info: URL ClassLoader:
    file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing)
    file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing)
    .......
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:745)

It appears that the two dependency jar cannot be found in TaskManager, so I dig into the source code, from CliFrontend to PackagedProgram to ClusterClient to JobGraph. It seems like the dependency jars is put in classpath and userCodeClassLoader in PackagedProgram, but never upload to the BlobServer in JobGraph where the xxx-1.0.jar is uploaded.

Am I missing something? In Flink 1.4.2, dependency jar is not supported?

Hope someone can give me some hint.

Appreciate it very mush.


Yours Sincerely

Joshua