classloader.resolve-order is not honored when submitting job to a remote cluster

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

classloader.resolve-order is not honored when submitting job to a remote cluster

tao xiao
Hi team,

I discovered that child first class loader is always used to initialize the main program when submitting the job to a yarn cluster using application mode regardless of what value classloader.resolve-order is set in flink-conf.yaml. But this is not the case if I submit the same job with the same config to the local cluster which honors the config and use the correct class loader to load the main program. Here is the log from local cluster

2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------
2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 15:01:16,616 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: classloader.resolve-order, parent-first
2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils                    [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).
[trim down the log]
2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: false)
2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount                                      [] - Loaded by org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6

Here is the log from yarn cluster
2021-05-30 07:20:14,434 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - --------------------------------------------------------------------------------
2021-05-30 07:20:14,438 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 2048m
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: classloader.resolve-order, parent-first
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: metrics.scope.jm, flink.jobmanager
[trim down the log]
2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount                                      [] - Loaded by org.apache.flink.util.ChildFirstClassLoader@3da30852

Here is the job to reproduce the problem
public static void main(String[] args) throws Exception {

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

LOG.info("Loaded by {}", WordCount.class.getClassLoader());
// get input data
DataStreamSource<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);

text.print();
env.execute("demo job");

}

Flink version 1.12.1

I believe the inconsistency is the result of user defined flink-conf not passed to PackageProgram which uses default config instead


Not sure if this is expected behavior that we never assume the main program is loaded with the configured class loader
--
Regards,
Tao
Reply | Threaded
Open this post in threaded view
|

Re: classloader.resolve-order is not honored when submitting job to a remote cluster

Till Rohrmann
Hi Tao,

I think this looks like a bug to me. Could it be that this problem is covered by [1, 2]? Maybe you can review this PR and check whether it solves the problem. If yes, then let's quickly get it in.


Cheers,
Till

On Sun, May 30, 2021 at 9:41 AM tao xiao <[hidden email]> wrote:
Hi team,

I discovered that child first class loader is always used to initialize the main program when submitting the job to a yarn cluster using application mode regardless of what value classloader.resolve-order is set in flink-conf.yaml. But this is not the case if I submit the same job with the same config to the local cluster which honors the config and use the correct class loader to load the main program. Here is the log from local cluster

2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------
2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 15:01:16,616 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: classloader.resolve-order, parent-first
2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils                    [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).
[trim down the log]
2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: false)
2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount                                      [] - Loaded by org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6

Here is the log from yarn cluster
2021-05-30 07:20:14,434 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - --------------------------------------------------------------------------------
2021-05-30 07:20:14,438 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 2048m
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: classloader.resolve-order, parent-first
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: metrics.scope.jm, flink.jobmanager
[trim down the log]
2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount                                      [] - Loaded by org.apache.flink.util.ChildFirstClassLoader@3da30852

Here is the job to reproduce the problem
public static void main(String[] args) throws Exception {

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

LOG.info("Loaded by {}", WordCount.class.getClassLoader());
// get input data
DataStreamSource<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);

text.print();
env.execute("demo job");

}

Flink version 1.12.1

I believe the inconsistency is the result of user defined flink-conf not passed to PackageProgram which uses default config instead


Not sure if this is expected behavior that we never assume the main program is loaded with the configured class loader
--
Regards,
Tao
Reply | Threaded
Open this post in threaded view
|

Re: classloader.resolve-order is not honored when submitting job to a remote cluster

tao xiao
Hi Till,

The PR covers the problem and it will fix the inconsistent class loading issue

On Tue, Jun 1, 2021 at 9:55 PM Till Rohrmann <[hidden email]> wrote:
Hi Tao,

I think this looks like a bug to me. Could it be that this problem is covered by [1, 2]? Maybe you can review this PR and check whether it solves the problem. If yes, then let's quickly get it in.


Cheers,
Till

On Sun, May 30, 2021 at 9:41 AM tao xiao <[hidden email]> wrote:
Hi team,

I discovered that child first class loader is always used to initialize the main program when submitting the job to a yarn cluster using application mode regardless of what value classloader.resolve-order is set in flink-conf.yaml. But this is not the case if I submit the same job with the same config to the local cluster which honors the config and use the correct class loader to load the main program. Here is the log from local cluster

2021-05-30 15:01:16,372 INFO  org.apache.flink.client.cli.CliFrontend                      [] - --------------------------------------------------------------------------------
2021-05-30 15:01:16,375 INFO  org.apache.flink.client.cli.CliFrontend                      [] -  Starting Command Line Client (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 15:01:16,616 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: classloader.resolve-order, parent-first
2021-05-30 15:01:16,763 WARN  org.apache.flink.runtime.util.HadoopUtils                    [] - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).
[trim down the log]
2021-05-30 15:01:16,830 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: false)
2021-05-30 15:01:16,871 INFO  io.demo.flink.WordCount                                      [] - Loaded by org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ParentFirstClassLoader@14c053c6

Here is the log from yarn cluster
2021-05-30 07:20:14,434 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - --------------------------------------------------------------------------------
2021-05-30 07:20:14,438 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] -  Starting YarnApplicationClusterEntryPoint (Version: 1.12.1, Scala: 2.11, Rev:dc404e2, Date:2021-01-09T14:46:36+01:00)
[trim down the log]
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: taskmanager.memory.process.size, 2048m
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: classloader.resolve-order, parent-first
2021-05-30 07:20:15,205 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: metrics.scope.jm, flink.jobmanager
[trim down the log]
2021-05-30 07:20:21,383 INFO  io.demo.flink.WordCount                                      [] - Loaded by org.apache.flink.util.ChildFirstClassLoader@3da30852

Here is the job to reproduce the problem
public static void main(String[] args) throws Exception {

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

LOG.info("Loaded by {}", WordCount.class.getClassLoader());
// get input data
DataStreamSource<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);

text.print();
env.execute("demo job");

}

Flink version 1.12.1

I believe the inconsistency is the result of user defined flink-conf not passed to PackageProgram which uses default config instead


Not sure if this is expected behavior that we never assume the main program is loaded with the configured class loader
--
Regards,
Tao


--
Regards,
Tao