Add custom configuration files to TMs classpath on YARN

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Add custom configuration files to TMs classpath on YARN

Mikhail Pryakhin-2
Hi all,

I run my flink job on yarn cluster and need to supply job configuration parameters via configuration file alongside with the job jar. (configuration file can't be packaged into jobs jar file).
I tried to put the configuration file into the folder that is passed via --yarnship option to the flink run command, then this file is copied to the yarn cluster and added to JVM class path like 'path/application.conf' but is ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the ENV_FLINK_CLASSPATH is built and haven't found any option to to tell flink (YarnClusterDescriptor especially) to add my configuration file to the TM JVM classpath... Is there any way to do so? If not do you consider to have such an ability to add files? (like in spark I just can pass any files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin










smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Add custom configuration files to TMs classpath on YARN

Mikhail Pryakhin-2
Hi guys,

any news?


Kind Regards,
Mike Pryakhin


On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:

Hi all,

I run my flink job on yarn cluster and need to supply job configuration parameters via configuration file alongside with the job jar. (configuration file can't be packaged into jobs jar file).
I tried to put the configuration file into the folder that is passed via --yarnship option to the flink run command, then this file is copied to the yarn cluster and added to JVM class path like 'path/application.conf' but is ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the ENV_FLINK_CLASSPATH is built and haven't found any option to to tell flink (YarnClusterDescriptor especially) to add my configuration file to the TM JVM classpath... Is there any way to do so? If not do you consider to have such an ability to add files? (like in spark I just can pass any files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin











smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Add custom configuration files to TMs classpath on YARN

Nico Kruber
A workaround may be to use the DistributedCache. It apparently is not
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
 * Registers a file at the distributed cache under the given name. The file will
be accessible
 * from any user-defined function in the (distributed) runtime under a local
path. Files
 * may be local files (as long as all relevant workers have access to it), or
files in a distributed file system.
 * The runtime will copy the files temporarily to a local cache, if needed.
 * <p>
 * The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
obtained inside UDFs via
 * {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access
 * {@link org.apache.flink.api.common.cache.DistributedCache} via
 * {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
 *
 * @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")
 * @param name The name under which the file is registered.
 */
public void registerCachedFile(String filePath, String name){
        registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);

        ...

        env.registerCachedFile(params.get("config_file", <default/path>),
"extConfig");

        ...
}

Flink's DistributedCache will then cache the file locally and you can use it in
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
        private static final long serialVersionUID = 1L;

        @Override
        public void open(Configuration conf) throws IOException {
                File file =
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
        }
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:

> Hi guys,
>
> any news?
> I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
> <https://issues.apache.org/jira/browse/FLINK-6949>.
>
>
> Kind Regards,
> Mike Pryakhin
>
> > On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:
> >
> > Hi all,
> >
> > I run my flink job on yarn cluster and need to supply job configuration
> > parameters via configuration file alongside with the job jar.
> > (configuration file can't be packaged into jobs jar file). I tried to put
> > the configuration file into the folder that is passed via --yarnship
> > option to the flink run command, then this file is copied to the yarn
> > cluster and added to JVM class path like 'path/application.conf' but is
> > ignored by TM JVM as it is neither jar(zip) file nor directory...
> >
> > A looked through the YarnClusterDescriptor class where the
> > ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
> > flink (YarnClusterDescriptor especially) to add my configuration file to
> > the TM JVM classpath... Is there any way to do so? If not do you consider
> > to have such an ability to add files? (like in spark I just can pass any
> > files via --files option)
> >
> > Thanks in advance.
> >
> > Kind Regards,
> > Mike Pryakhin


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Add custom configuration files to TMs classpath on YARN

Mikhail Pryakhin-2
Hi Nico!
Sounds great, will give it a try and return back with results soon.

Thank you so much for your help!!

Kind Regards,
Mike Pryakhin

On 21 Jun 2017, at 16:36, Nico Kruber <[hidden email]> wrote:

A workaround may be to use the DistributedCache. It apparently is not
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
* Registers a file at the distributed cache under the given name. The file will
be accessible
* from any user-defined function in the (distributed) runtime under a local
path. Files
* may be local files (as long as all relevant workers have access to it), or
files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
obtained inside UDFs via
* {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);

...

env.registerCachedFile(params.get("config_file", <default/path>),
"extConfig");

...
}

Flink's DistributedCache will then cache the file locally and you can use it in
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration conf) throws IOException {
File file =
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
}
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
Hi guys,

any news?
I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
<https://issues.apache.org/jira/browse/FLINK-6949>.


Kind Regards,
Mike Pryakhin

On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:

Hi all,

I run my flink job on yarn cluster and need to supply job configuration
parameters via configuration file alongside with the job jar.
(configuration file can't be packaged into jobs jar file). I tried to put
the configuration file into the folder that is passed via --yarnship
option to the flink run command, then this file is copied to the yarn
cluster and added to JVM class path like 'path/application.conf' but is
ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the
ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
flink (YarnClusterDescriptor especially) to add my configuration file to
the TM JVM classpath... Is there any way to do so? If not do you consider
to have such an ability to add files? (like in spark I just can pass any
files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin



smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Add custom configuration files to TMs classpath on YARN

Mikhail Pryakhin-2
Hi Nico,

Thanks a lot for you help, but unfortunately, the workaround you suggested doesn't work for me.
I tried to leverage the StreamExecutionEnvironment#registerCachedFile method but failed because this instance is created when the application master has already been started therefore the classpath to run the application somewhere on YARN cluster has already been created by means of org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to pass a local folder at the moment I submit the application so that it is included in the application YARN classpath.
The option you suggested works well if I need to cache a file that is available for me at the moment I want to register it (for example a file on HDFS).

Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to pass user-specified folders to the YARN application classpath?



Kind Regards,
Mike Pryakhin



On 21 Jun 2017, at 16:55, Mikhail Pryakhin <[hidden email]> wrote:

Hi Nico!
Sounds great, will give it a try and return back with results soon.

Thank you so much for your help!!

Kind Regards,
Mike Pryakhin

On 21 Jun 2017, at 16:36, Nico Kruber <[hidden email]> wrote:

A workaround may be to use the DistributedCache. It apparently is not
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
* Registers a file at the distributed cache under the given name. The file will
be accessible
* from any user-defined function in the (distributed) runtime under a local
path. Files
* may be local files (as long as all relevant workers have access to it), or
files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
obtained inside UDFs via
* {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
"<a href="hdfs://host:port/and/path" class="">hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);

...

env.registerCachedFile(params.get("config_file", <default/path>),
"extConfig");

...
}

Flink's DistributedCache will then cache the file locally and you can use it in
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration conf) throws IOException {
File file =
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
}
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
Hi guys,

any news?
I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
<https://issues.apache.org/jira/browse/FLINK-6949>.


Kind Regards,
Mike Pryakhin

On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:

Hi all,

I run my flink job on yarn cluster and need to supply job configuration
parameters via configuration file alongside with the job jar.
(configuration file can't be packaged into jobs jar file). I tried to put
the configuration file into the folder that is passed via --yarnship
option to the flink run command, then this file is copied to the yarn
cluster and added to JVM class path like 'path/application.conf' but is
ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the
ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
flink (YarnClusterDescriptor especially) to add my configuration file to
the TM JVM classpath... Is there any way to do so? If not do you consider
to have such an ability to add files? (like in spark I just can pass any
files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin




smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Add custom configuration files to TMs classpath on YARN

rmetzger0
Hi Mike,

For using the DistributedCache approach, you need to have HDFS or another distributed FS available to distribute the files.

I would actually like to understand why you said " then this file is copied to the yarn cluster and added to JVM class  [...] but is ignored by TM JVM as it is neither jar(zip) file nor directory..."
I believe you should be able to load non-class files through the classloader as well.
Did you see any code that excludes non-class files? Afaik the Taskmanagers have access to all files (of any type) that are passed using the --ship command (or in the lib/ folder).


On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin <[hidden email]> wrote:
Hi Nico,

Thanks a lot for you help, but unfortunately, the workaround you suggested doesn't work for me.
I tried to leverage the StreamExecutionEnvironment#registerCachedFile method but failed because this instance is created when the application master has already been started therefore the classpath to run the application somewhere on YARN cluster has already been created by means of org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to pass a local folder at the moment I submit the application so that it is included in the application YARN classpath.
The option you suggested works well if I need to cache a file that is available for me at the moment I want to register it (for example a file on HDFS).

Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to pass user-specified folders to the YARN application classpath?



Kind Regards,
Mike Pryakhin



On 21 Jun 2017, at 16:55, Mikhail Pryakhin <[hidden email]> wrote:

Hi Nico!
Sounds great, will give it a try and return back with results soon.

Thank you so much for your help!!

Kind Regards,
Mike Pryakhin

On 21 Jun 2017, at 16:36, Nico Kruber <[hidden email]> wrote:

A workaround may be to use the DistributedCache. It apparently is not
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
* Registers a file at the distributed cache under the given name. The file will
be accessible
* from any user-defined function in the (distributed) runtime under a local
path. Files
* may be local files (as long as all relevant workers have access to it), or
files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
obtained inside UDFs via
* {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);

...

env.registerCachedFile(params.get("config_file", <default/path>),
"extConfig");

...
}

Flink's DistributedCache will then cache the file locally and you can use it in
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration conf) throws IOException {
File file =
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
}
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
Hi guys,

any news?
I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
<https://issues.apache.org/jira/browse/FLINK-6949>.


Kind Regards,
Mike Pryakhin

On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:

Hi all,

I run my flink job on yarn cluster and need to supply job configuration
parameters via configuration file alongside with the job jar.
(configuration file can't be packaged into jobs jar file). I tried to put
the configuration file into the folder that is passed via --yarnship
option to the flink run command, then this file is copied to the yarn
cluster and added to JVM class path like 'path/application.conf' but is
ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the
ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
flink (YarnClusterDescriptor especially) to add my configuration file to
the TM JVM classpath... Is there any way to do so? If not do you consider
to have such an ability to add files? (like in spark I just can pass any
files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin




Reply | Threaded
Open this post in threaded view
|

Re: Add custom configuration files to TMs classpath on YARN

Mikhail Pryakhin-2
Hi Robert,

Thanks for your reply!

>I believe you should be able to load non-class files through the classloader as well.
Could you please clarify what you mean by this?

>Did you see any code that excludes non-class files? 
No I didn't, but I did see the following code here [1]:

if (shipFile.isDirectory()) {
// add directories to the classpath
java.nio.file.Path shipPath = shipFile.toPath();
final java.nio.file.Path parentPath = shipPath.getParent();

Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
@Override
public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
throws IOException {
java.nio.file.Path relativePath = parentPath.relativize(file);

classPaths.add(relativePath.toString());

return FileVisitResult.CONTINUE;
}
});
} else {
// add files to the classpath
classPaths.add(shipFile.getName());
}

the code above traverses the folder's content I passed via --yarnship option and appends non class files to the classpath in case a shipfile is a directory. That eventually gives no results as we all know only the following files can be set as jvm classpath: .class files, .jar files, .zip files or folders.

I believe that in case the code above doesn't traverse directories contents then everything will work as expected.

For instance if I pass a file then it appends to the classpath as is, if I specify a folder then it goes to the classpath as folder.
By the meantime it is not possible to pass multiple yarnship options, but I also created another jira ticket [2] that proposes to add the ability to specify multiple yarnship folders.

What do you think about that?

[1] https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:
[2] https://issues.apache.org/jira/browse/FLINK-6950


Thanks in advance

Kind Regards,
Mike Pryakhin



On 27 Sep 2017, at 18:30, Robert Metzger <[hidden email]> wrote:

Hi Mike,

For using the DistributedCache approach, you need to have HDFS or another distributed FS available to distribute the files.

I would actually like to understand why you said " then this file is copied to the yarn cluster and added to JVM class  [...] but is ignored by TM JVM as it is neither jar(zip) file nor directory..."
I believe you should be able to load non-class files through the classloader as well.
Did you see any code that excludes non-class files? Afaik the Taskmanagers have access to all files (of any type) that are passed using the --ship command (or in the lib/ folder).


On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin <[hidden email]> wrote:
Hi Nico,

Thanks a lot for you help, but unfortunately, the workaround you suggested doesn't work for me.
I tried to leverage the StreamExecutionEnvironment#registerCachedFile method but failed because this instance is created when the application master has already been started therefore the classpath to run the application somewhere on YARN cluster has already been created by means of org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to pass a local folder at the moment I submit the application so that it is included in the application YARN classpath.
The option you suggested works well if I need to cache a file that is available for me at the moment I want to register it (for example a file on HDFS).

Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to pass user-specified folders to the YARN application classpath?



Kind Regards,
Mike Pryakhin



On 21 Jun 2017, at 16:55, Mikhail Pryakhin <[hidden email]> wrote:

Hi Nico!
Sounds great, will give it a try and return back with results soon.

Thank you so much for your help!!

Kind Regards,
Mike Pryakhin

On 21 Jun 2017, at 16:36, Nico Kruber <[hidden email]> wrote:

A workaround may be to use the DistributedCache. It apparently is not
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
* Registers a file at the distributed cache under the given name. The file will
be accessible
* from any user-defined function in the (distributed) runtime under a local
path. Files
* may be local files (as long as all relevant workers have access to it), or
files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
obtained inside UDFs via
* {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);

...

env.registerCachedFile(params.get("config_file", <default/path>),
"extConfig");

...
}

Flink's DistributedCache will then cache the file locally and you can use it in
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration conf) throws IOException {
File file =
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
}
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
Hi guys,

any news?
I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
<https://issues.apache.org/jira/browse/FLINK-6949>.


Kind Regards,
Mike Pryakhin

On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:

Hi all,

I run my flink job on yarn cluster and need to supply job configuration
parameters via configuration file alongside with the job jar.
(configuration file can't be packaged into jobs jar file). I tried to put
the configuration file into the folder that is passed via --yarnship
option to the flink run command, then this file is copied to the yarn
cluster and added to JVM class path like 'path/application.conf' but is
ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the
ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
flink (YarnClusterDescriptor especially) to add my configuration file to
the TM JVM classpath... Is there any way to do so? If not do you consider
to have such an ability to add files? (like in spark I just can pass any
files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin






smime.p7s (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Add custom configuration files to TMs classpath on YARN

Haohui Mai
What we internally did is to inherit from AbstractYarnClusterDescriptor and to customize from there.

It's not too difficult but it's nice to see it can be taken care of by AbstractYarnCusterDescriptor.

~Haohui

On Wed, Sep 27, 2017 at 9:36 AM Mikhail Pryakhin <[hidden email]> wrote:
Hi Robert,

Thanks for your reply!

>I believe you should be able to load non-class files through the classloader as well.
Could you please clarify what you mean by this?

>Did you see any code that excludes non-class files? 
No I didn't, but I did see the following code here [1]:

if (shipFile.isDirectory()) {
// add directories to the classpath
java.nio.file.Path shipPath = shipFile.toPath();
final java.nio.file.Path parentPath = shipPath.getParent();

Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
@Override
public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
throws IOException {
java.nio.file.Path relativePath = parentPath.relativize(file);

classPaths.add(relativePath.toString());

return FileVisitResult.CONTINUE;
}
});
} else {
// add files to the classpath
classPaths.add(shipFile.getName());
}

the code above traverses the folder's content I passed via --yarnship option and appends non class files to the classpath in case a shipfile is a directory. That eventually gives no results as we all know only the following files can be set as jvm classpath: .class files, .jar files, .zip files or folders.

I believe that in case the code above doesn't traverse directories contents then everything will work as expected.

For instance if I pass a file then it appends to the classpath as is, if I specify a folder then it goes to the classpath as folder.
By the meantime it is not possible to pass multiple yarnship options, but I also created another jira ticket [2] that proposes to add the ability to specify multiple yarnship folders.

What do you think about that?

[1] https://github.com/apache/flink/blob/2eaf92b1f0a0d965c14b65755d25f1a8167de023/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java#L1003:
[2] https://issues.apache.org/jira/browse/FLINK-6950


Thanks in advance

Kind Regards,
Mike Pryakhin



On 27 Sep 2017, at 18:30, Robert Metzger <[hidden email]> wrote:

Hi Mike,

For using the DistributedCache approach, you need to have HDFS or another distributed FS available to distribute the files.

I would actually like to understand why you said " then this file is copied to the yarn cluster and added to JVM class  [...] but is ignored by TM JVM as it is neither jar(zip) file nor directory..."
I believe you should be able to load non-class files through the classloader as well.
Did you see any code that excludes non-class files? Afaik the Taskmanagers have access to all files (of any type) that are passed using the --ship command (or in the lib/ folder).


On Wed, Sep 27, 2017 at 3:42 PM, Mikhail Pryakhin <[hidden email]> wrote:
Hi Nico,

Thanks a lot for you help, but unfortunately, the workaround you suggested doesn't work for me.
I tried to leverage the StreamExecutionEnvironment#registerCachedFile method but failed because this instance is created when the application master has already been started therefore the classpath to run the application somewhere on YARN cluster has already been created by means of org.apache.flink.yarn.YarnClusterClient. In my case, I need to be able to pass a local folder at the moment I submit the application so that it is included in the application YARN classpath.
The option you suggested works well if I need to cache a file that is available for me at the moment I want to register it (for example a file on HDFS).

Is there any way we can extend org.apache.flink.yarn.YarnClusterClient to pass user-specified folders to the YARN application classpath?



Kind Regards,
Mike Pryakhin



On 21 Jun 2017, at 16:55, Mikhail Pryakhin <[hidden email]> wrote:

Hi Nico!
Sounds great, will give it a try and return back with results soon.

Thank you so much for your help!!

Kind Regards,
Mike Pryakhin

On 21 Jun 2017, at 16:36, Nico Kruber <[hidden email]> wrote:

A workaround may be to use the DistributedCache. It apparently is not
documented much but the JavaDoc mentions roughly how to use it:

https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/
flink/api/java/ExecutionEnvironment.java#L954

/**
* Registers a file at the distributed cache under the given name. The file will
be accessible
* from any user-defined function in the (distributed) runtime under a local
path. Files
* may be local files (as long as all relevant workers have access to it), or
files in a distributed file system.
* The runtime will copy the files temporarily to a local cache, if needed.
* <p>
* The {@link org.apache.flink.api.common.functions.RuntimeContext} can be
obtained inside UDFs via
* {@link
org.apache.flink.api.common.functions.RichFunction#getRuntimeContext()} and
provides access
* {@link org.apache.flink.api.common.cache.DistributedCache} via
* {@link
org.apache.flink.api.common.functions.RuntimeContext#getDistributedCache()}.
*
* @param filePath The path of the file, as a URI (e.g. "file:///some/path" or
"hdfs://host:port/and/path")
* @param name The name under which the file is registered.
*/
public void registerCachedFile(String filePath, String name){
registerCachedFile(filePath, name, false);
}

You could pass the actual file URL to use for each instance of your job that
requires a different file via a simple job parameter:


public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);

...

env.registerCachedFile(params.get("config_file", <default/path>),
"extConfig");

...
}

Flink's DistributedCache will then cache the file locally and you can use it in
a RichFunction like in
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/
apache/flink/test/distributedCache/DistributedCacheTest.java#L99

public class MyFunction extends AbstractRichFunction {
private static final long serialVersionUID = 1L;

@Override
public void open(Configuration conf) throws IOException {
File file =
getRuntimeContext().getDistributedCache().getFile("extConfig");
...
}
}


Nico

On Monday, 19 June 2017 14:42:12 CEST Mikhail Pryakhin wrote:
Hi guys,

any news?
I’ve created a jira-ticket https://issues.apache.org/jira/browse/FLINK-6949
<https://issues.apache.org/jira/browse/FLINK-6949>.


Kind Regards,
Mike Pryakhin

On 16 Jun 2017, at 16:35, Mikhail Pryakhin <[hidden email]> wrote:

Hi all,

I run my flink job on yarn cluster and need to supply job configuration
parameters via configuration file alongside with the job jar.
(configuration file can't be packaged into jobs jar file). I tried to put
the configuration file into the folder that is passed via --yarnship
option to the flink run command, then this file is copied to the yarn
cluster and added to JVM class path like 'path/application.conf' but is
ignored by TM JVM as it is neither jar(zip) file nor directory...

A looked through the YarnClusterDescriptor class where the
ENV_FLINK_CLASSPATH is built and haven't found any option to to tell
flink (YarnClusterDescriptor especially) to add my configuration file to
the TM JVM classpath... Is there any way to do so? If not do you consider
to have such an ability to add files? (like in spark I just can pass any
files via --files option)

Thanks in advance.

Kind Regards,
Mike Pryakhin