How to handle startup for mandatory config parameters?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to handle startup for mandatory config parameters?

John Smith
Hi, so I have no problem reading config from resources files or anything like that...

But my question is around how do we handle mandatory fields?

1- If a mandatory field is missing during startup... Do we just "log" it and do System.exit()?
2- If we do log it where does the log end up, the task or the job node?
Reply | Threaded
Open this post in threaded view
|

Re: How to handle startup for mandatory config parameters?

Yang Wang
Hi John,

Most of the config options will have default values. However, you still need to specify some
required fields. For example, the taskmanager resource related options. If you do not specify
anyone, the exception will be thrown on the client side like following.

Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Either Task Heap Memory size (taskmanager.memory.task.heap.size) and Managed Memory size (taskmanager.memory.managed.size), or Total Flink Memory size (taskmanager.memory.flink.size), or Total Process Memory size (taskmanager.memory.process.size) need to be configured explicitly.
at org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:149)
at org.apache.flink.runtime.util.BashJavaUtils.getTmResourceJvmParams(BashJavaUtils.java:62)
at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)


Also when you deploy Flink on Yarn cluster, it will check the queue configuration, resource, etc.
If some config exception throws during startup, the Flink client will fail and print the exception on
the console and client logs(usually in the {FLINK_HOME}/logs directory).

However, not all the config options could be checked on the client side. For example, If you set a
wrong checkpoint path, then you need to find the exceptions or errors in the jobmanager logs.



Best,
Yang

John Smith <[hidden email]> 于2020年1月16日周四 上午12:38写道:
Hi, so I have no problem reading config from resources files or anything like that...

But my question is around how do we handle mandatory fields?

1- If a mandatory field is missing during startup... Do we just "log" it and do System.exit()?
2- If we do log it where does the log end up, the task or the job node?
Reply | Threaded
Open this post in threaded view
|

Re: How to handle startup for mandatory config parameters?

John Smith
Sorry I should have specified how to handle job specific config parameters using ParameterTool

ParameterTool parameters = ...

String someConfig = parameters.get("some.config"); <--- This is mandatory

Do I check someConfig for what ever requirement and just throw an exception before starting the job or should I do System.exit(); Log it... Where does the log if I log it?

On Wed, 15 Jan 2020 at 22:21, Yang Wang <[hidden email]> wrote:
Hi John,

Most of the config options will have default values. However, you still need to specify some
required fields. For example, the taskmanager resource related options. If you do not specify
anyone, the exception will be thrown on the client side like following.

Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Either Task Heap Memory size (taskmanager.memory.task.heap.size) and Managed Memory size (taskmanager.memory.managed.size), or Total Flink Memory size (taskmanager.memory.flink.size), or Total Process Memory size (taskmanager.memory.process.size) need to be configured explicitly.
at org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:149)
at org.apache.flink.runtime.util.BashJavaUtils.getTmResourceJvmParams(BashJavaUtils.java:62)
at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)


Also when you deploy Flink on Yarn cluster, it will check the queue configuration, resource, etc.
If some config exception throws during startup, the Flink client will fail and print the exception on
the console and client logs(usually in the {FLINK_HOME}/logs directory).

However, not all the config options could be checked on the client side. For example, If you set a
wrong checkpoint path, then you need to find the exceptions or errors in the jobmanager logs.



Best,
Yang

John Smith <[hidden email]> 于2020年1月16日周四 上午12:38写道:
Hi, so I have no problem reading config from resources files or anything like that...

But my question is around how do we handle mandatory fields?

1- If a mandatory field is missing during startup... Do we just "log" it and do System.exit()?
2- If we do log it where does the log end up, the task or the job node?
Reply | Threaded
Open this post in threaded view
|

Re: How to handle startup for mandatory config parameters?

Biao Liu
Hi John,

ParameterTools is just a utility to help user to handle arguments.
I guess you are using ParameterTools in main method. If it is, it should be in client log file, like Yang said, it's under "{FLINK_HOME}/log".

> Do I check someConfig for what ever requirement and just throw an exception before starting the job or should I do System.exit();

I"m not sure what you exactly want.
Throwing an exception or System.exit would both fail the job (it depends on where you codes are). However invoking System.exit is not always a good practice.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 04:59, John Smith <[hidden email]> wrote:
Sorry I should have specified how to handle job specific config parameters using ParameterTool

ParameterTool parameters = ...

String someConfig = parameters.get("some.config"); <--- This is mandatory

Do I check someConfig for what ever requirement and just throw an exception before starting the job or should I do System.exit(); Log it... Where does the log if I log it?

On Wed, 15 Jan 2020 at 22:21, Yang Wang <[hidden email]> wrote:
Hi John,

Most of the config options will have default values. However, you still need to specify some
required fields. For example, the taskmanager resource related options. If you do not specify
anyone, the exception will be thrown on the client side like following.

Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Either Task Heap Memory size (taskmanager.memory.task.heap.size) and Managed Memory size (taskmanager.memory.managed.size), or Total Flink Memory size (taskmanager.memory.flink.size), or Total Process Memory size (taskmanager.memory.process.size) need to be configured explicitly.
at org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:149)
at org.apache.flink.runtime.util.BashJavaUtils.getTmResourceJvmParams(BashJavaUtils.java:62)
at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)


Also when you deploy Flink on Yarn cluster, it will check the queue configuration, resource, etc.
If some config exception throws during startup, the Flink client will fail and print the exception on
the console and client logs(usually in the {FLINK_HOME}/logs directory).

However, not all the config options could be checked on the client side. For example, If you set a
wrong checkpoint path, then you need to find the exceptions or errors in the jobmanager logs.



Best,
Yang

John Smith <[hidden email]> 于2020年1月16日周四 上午12:38写道:
Hi, so I have no problem reading config from resources files or anything like that...

But my question is around how do we handle mandatory fields?

1- If a mandatory field is missing during startup... Do we just "log" it and do System.exit()?
2- If we do log it where does the log end up, the task or the job node?
Reply | Threaded
Open this post in threaded view
|

Re: How to handle startup for mandatory config parameters?

John Smith
Hi, let me see if I can be more clear....

When the job is launched, before the 2 calls below in the main() we read some configs, regardless if it's Paramtools or file or what ever doesn't matter. Some of those params are mandatory.
I'm guessing it's better to log and throw exception so the main() can fail/exit and the job never starts, right?

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.execute(jobName);



On Fri, 17 Jan 2020 at 03:48, Biao Liu <[hidden email]> wrote:
Hi John,

ParameterTools is just a utility to help user to handle arguments.
I guess you are using ParameterTools in main method. If it is, it should be in client log file, like Yang said, it's under "{FLINK_HOME}/log".

> Do I check someConfig for what ever requirement and just throw an exception before starting the job or should I do System.exit();

I"m not sure what you exactly want.
Throwing an exception or System.exit would both fail the job (it depends on where you codes are). However invoking System.exit is not always a good practice.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 04:59, John Smith <[hidden email]> wrote:
Sorry I should have specified how to handle job specific config parameters using ParameterTool

ParameterTool parameters = ...

String someConfig = parameters.get("some.config"); <--- This is mandatory

Do I check someConfig for what ever requirement and just throw an exception before starting the job or should I do System.exit(); Log it... Where does the log if I log it?

On Wed, 15 Jan 2020 at 22:21, Yang Wang <[hidden email]> wrote:
Hi John,

Most of the config options will have default values. However, you still need to specify some
required fields. For example, the taskmanager resource related options. If you do not specify
anyone, the exception will be thrown on the client side like following.

Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Either Task Heap Memory size (taskmanager.memory.task.heap.size) and Managed Memory size (taskmanager.memory.managed.size), or Total Flink Memory size (taskmanager.memory.flink.size), or Total Process Memory size (taskmanager.memory.process.size) need to be configured explicitly.
at org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:149)
at org.apache.flink.runtime.util.BashJavaUtils.getTmResourceJvmParams(BashJavaUtils.java:62)
at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)


Also when you deploy Flink on Yarn cluster, it will check the queue configuration, resource, etc.
If some config exception throws during startup, the Flink client will fail and print the exception on
the console and client logs(usually in the {FLINK_HOME}/logs directory).

However, not all the config options could be checked on the client side. For example, If you set a
wrong checkpoint path, then you need to find the exceptions or errors in the jobmanager logs.



Best,
Yang

John Smith <[hidden email]> 于2020年1月16日周四 上午12:38写道:
Hi, so I have no problem reading config from resources files or anything like that...

But my question is around how do we handle mandatory fields?

1- If a mandatory field is missing during startup... Do we just "log" it and do System.exit()?
2- If we do log it where does the log end up, the task or the job node?
Reply | Threaded
Open this post in threaded view
|

Re: How to handle startup for mandatory config parameters?

John Smith
Ok, perfect. Thanks!

On Fri, 17 Jan 2020 at 11:39, Seth Wiesman <[hidden email]> wrote:
Yes, the preferred method is to log and throw an exception prior to calling `execute`.

The logs will be on the flink dispatcher and the exception will be returned wrapped in a failed deployment exception. You do not want to call System.exit because that will shut down the entire Flink cluster where you just want to fail the deployment.

On Fri, Jan 17, 2020 at 10:34 AM John Smith <[hidden email]> wrote:
Hi, let me see if I can be more clear....

When the job is launched, before the 2 calls below in the main() we read some configs, regardless if it's Paramtools or file or what ever doesn't matter. Some of those params are mandatory.
I'm guessing it's better to log and throw exception so the main() can fail/exit and the job never starts, right?

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
env.execute(jobName);



On Fri, 17 Jan 2020 at 03:48, Biao Liu <[hidden email]> wrote:
Hi John,

ParameterTools is just a utility to help user to handle arguments.
I guess you are using ParameterTools in main method. If it is, it should be in client log file, like Yang said, it's under "{FLINK_HOME}/log".

> Do I check someConfig for what ever requirement and just throw an exception before starting the job or should I do System.exit();

I"m not sure what you exactly want.
Throwing an exception or System.exit would both fail the job (it depends on where you codes are). However invoking System.exit is not always a good practice.

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 04:59, John Smith <[hidden email]> wrote:
Sorry I should have specified how to handle job specific config parameters using ParameterTool

ParameterTool parameters = ...

String someConfig = parameters.get("some.config"); <--- This is mandatory

Do I check someConfig for what ever requirement and just throw an exception before starting the job or should I do System.exit(); Log it... Where does the log if I log it?

On Wed, 15 Jan 2020 at 22:21, Yang Wang <[hidden email]> wrote:
Hi John,

Most of the config options will have default values. However, you still need to specify some
required fields. For example, the taskmanager resource related options. If you do not specify
anyone, the exception will be thrown on the client side like following.

Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Either Task Heap Memory size (taskmanager.memory.task.heap.size) and Managed Memory size (taskmanager.memory.managed.size), or Total Flink Memory size (taskmanager.memory.flink.size), or Total Process Memory size (taskmanager.memory.process.size) need to be configured explicitly.
at org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:149)
at org.apache.flink.runtime.util.BashJavaUtils.getTmResourceJvmParams(BashJavaUtils.java:62)
at org.apache.flink.runtime.util.BashJavaUtils.main(BashJavaUtils.java:46)


Also when you deploy Flink on Yarn cluster, it will check the queue configuration, resource, etc.
If some config exception throws during startup, the Flink client will fail and print the exception on
the console and client logs(usually in the {FLINK_HOME}/logs directory).

However, not all the config options could be checked on the client side. For example, If you set a
wrong checkpoint path, then you need to find the exceptions or errors in the jobmanager logs.



Best,
Yang

John Smith <[hidden email]> 于2020年1月16日周四 上午12:38写道:
Hi, so I have no problem reading config from resources files or anything like that...

But my question is around how do we handle mandatory fields?

1- If a mandatory field is missing during startup... Do we just "log" it and do System.exit()?
2- If we do log it where does the log end up, the task or the job node?


--

Seth Wiesman | Solutions Architect

+1 314 387 1463



Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen