Hi,
I'd like to submit a job with dependency jars by flink run, but it failed. Here is the script, /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \ -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \ -c StreamExample \ -C file:/home/work/xxx/lib/commons-math3-3.5.jar \ -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \ ... xxx-1.0.jar As described in https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage , "-C" means to provide the dependency jar. After I execute the command, the job succeed to submit, but can not run in flink cluster on yarn. Exceptions is like below: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 ClassLoader info: URL ClassLoader: file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing) file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing) ....... Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:745) It appears that the two dependency jar cannot be found in TaskManager, so I dig into the source code, from CliFrontend to PackagedProgram to ClusterClient to JobGraph. It seems like the dependency jars is put in classpath and userCodeClassLoader in PackagedProgram, but never upload to the BlobServer in JobGraph where the xxx-1.0.jar is uploaded. Am I missing something? In Flink 1.4.2, dependency jar is not supported? Hope someone can give me some hint. Appreciate it very mush. Yours Sincerely Joshua |
Hi,
Are those paths: file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing) file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing) accessible from the inside of your container? bin/flink run --help (…) -C,--classpath <url> Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL. The protocol must be supported by the {@link java.net.URLClassLoader}. Other nit, maybe the problem is with single slash after “file:”. You have file:/home/... While it might need to be file://home/... Piotrek
|
In reply to this post by Joshua Fan
> 在 2018年8月3日,19:03,Joshua Fan <[hidden email]> 写道: > > Hi, > > I'd like to submit a job with dependency jars by flink run, but it failed. > > Here is the script, > > /usr/bin/hadoop/software/flink-1.4.2/bin/flink run \ > -m yarn-cluster -yn 1 -ys 8 -yjm 2148 -ytm 4096 -ynm jarsTest \ > -c StreamExample \ > -C file:/home/work/xxx/lib/commons-math3-3.5.jar \ > -C file:/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar \ > ... > xxx-1.0.jar > > As described in https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/cli.html#usage , "-C" means to provide the dependency jar. > > After I execute the command, the job succeed to submit, but can not run in flink cluster on yarn. Exceptions is like below: > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08 > ClassLoader info: URL ClassLoader: > file: '/home/work/xxx/lib/commons-math3-3.5.jar' (missing) > file: '/home/work/xxx/lib/flink-connector-kafka-0.8_2.11-1.4.2.jar' (missing) > ....... > Class not resolvable through given classloader. > at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232) > at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > > It appears that the two dependency jar cannot be found in TaskManager, so I dig into the source code, from CliFrontend to PackagedProgram to ClusterClient to JobGraph. It seems like the dependency jars is put in classpath and userCodeClassLoader in PackagedProgram, but never upload to the BlobServer in JobGraph where the xxx-1.0.jar is uploaded. > > Am I missing something? In Flink 1.4.2, dependency jar is not supported? > > Hope someone can give me some hint. > > Appreciate it very mush. > > > Yours Sincerely > > Joshua > > > Hi Joshua, I think what you’re looking for is `-yt` option, which is used for distributing a specified directory via YARN to the TaskManager nodes. And you can find its description in the Flink client by executing `bin/flink`. > -yt,--yarnship <arg> Ship files in the specified directory (t for transfer) Best Regards, Paul Lam |
In reply to this post by Piotr Nowojski
Hi, -yt,--yarnship <arg> Ship files in the specified directory (t for transfer) I guess that you even got a warning in your log files: LOG.warn("Ship directory is not a directory. Ignoring it.”); I’m not sure, but maybe with `-yt` you do not even need to specify `-C`, just `-yt /home/work/xxx/lib/` should suffice: Piotrek
|
Hi,
I’m glad that you have found a solution to your problem :) To shorten feedback you can/should test as much logic as possible using smaller unit tests and some small scale integration tests: https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html . Usually there is no need for starting up full Flink cluster and submitting your WIP job during development. You can do such end to end tests only once before committing/pushing/merging/deploying. Piotrek
|
Free forum by Nabble | Edit this page |