Flink parameter configuration does not take effect

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink parameter configuration does not take effect

Jason Lee
Hi  everyone,

When I was researching and using Flink recently, I found that the official documentation on how to configure parameters is confusing, and when I set the parameters in some ways, it does not take effect. mainly as follows:

we usually use a DDL Jar package to execute Flink SQL tasks, but we found that some parameters are set by StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value). These parameters cannot take effect. For example, taskmanager.memory.managed.fraction cannot take effect if the parameter is set in the above way (the Note in TableConfig in the source code is as follows: Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment. ). And StreamExecutionEnvironment.getConfiguration() is protected, which leads to some parameters that cannot be set through the api. I feel that this is not reasonable. Because sometimes, we want to configure different parameters for different tasks in the form of Configuration.setxxx(key, value) in the api, instead of just configuring parameters through flink run -yD or flink-conf.yaml.

In the Configuration module of the official document, the description and default value of each parameter are introduced. There is no relevant introduction about the parameter setting method in the official document Configuration module. I think this is not friendly enough for users, especially users who want to personalize some parameters. I feel that this method can be described in the official document.

In summary, for some normal tasks we can use the default parameter configuration, but for some tasks that require personalized configuration, especially Flink SQL tasks, I have a few suggestions on the use of configuration:

1. Regarding the api, I think that StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value) configures parameters in this way. It should be separately explained, which parameters are not effective if configured in this way, otherwise, Some parameters configured in this way will not take effect, which will cause confusion for users.

2. In the official document, I think it is necessary to add instructions on how to configure these parameters. For example, it can be configured not only in flink-conf.yaml, but also in the running command through flink run -yD, or whether there are other The parameters can be configured in the mode.

3. Questions about StreamExecutionEnvironment.getConfiguration() being protected. Will the community develop in later versions? Is there any effective way for users to set some parameters in the api and make them effective, such as configuring the taskmanager.memory.managed.fraction parameter.

Regarding some of the above issues, and why the parameter setting will not take effec. Maybe I did not describe it clearly enough, or because I did not understand the problem clearly, I hope to get a reply and discuss from the community.

Best,
Jason


Reply | Threaded
Open this post in threaded view
|

Re: Flink parameter configuration does not take effect

rmetzger0
Hi Jason,

How are you deploying your Flink SQL tasks? (are you using per-job/application clusters, or a session cluster? )

I agree that the configuration management is not optimal in Flink. By default, I would recommend assuming that all configuration parameters are cluster settings, which require a cluster restart. Very few options (mostly those listed in the "Execution" section) are job settings, which can be set for each job.

Would it help if the table of configuration options in the documentation would tag the configuration option (with "Cluster" and "Job" option types?)?
Secondly, the API should probably only expose an immutable Configuration object, if the configuration is effectively immutable. I believe the option to set configuration on the (Stream)(Table)Environment is mostly there for local execution of Flink.


2. I agree, the docs are incomplete here (probably another symptom of the fact that the whole configuration management in Flink is not optimal). I see what I can do to improve the situation.

3. Except for local execution (everything runs in one JVM), I don't think we'll add support for this anytime soon. Some of the cluster configuration parameters just have to be global (like memory management), as they apply to all jobs executed on a cluster.


This ticket could be related to your problems: https://issues.apache.org/jira/browse/FLINK-21065

Let us know how you are deploying your Flink jobs, this will shed some more light on the discussion!

Best,
Robert


On Wed, Jun 16, 2021 at 4:27 AM Jason Lee <[hidden email]> wrote:
Hi  everyone,

When I was researching and using Flink recently, I found that the official documentation on how to configure parameters is confusing, and when I set the parameters in some ways, it does not take effect. mainly as follows:

we usually use a DDL Jar package to execute Flink SQL tasks, but we found that some parameters are set by StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value). These parameters cannot take effect. For example, taskmanager.memory.managed.fraction cannot take effect if the parameter is set in the above way (the Note in TableConfig in the source code is as follows: Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment. ). And StreamExecutionEnvironment.getConfiguration() is protected, which leads to some parameters that cannot be set through the api. I feel that this is not reasonable. Because sometimes, we want to configure different parameters for different tasks in the form of Configuration.setxxx(key, value) in the api, instead of just configuring parameters through flink run -yD or flink-conf.yaml.

In the Configuration module of the official document, the description and default value of each parameter are introduced. There is no relevant introduction about the parameter setting method in the official document Configuration module. I think this is not friendly enough for users, especially users who want to personalize some parameters. I feel that this method can be described in the official document.

In summary, for some normal tasks we can use the default parameter configuration, but for some tasks that require personalized configuration, especially Flink SQL tasks, I have a few suggestions on the use of configuration:

1. Regarding the api, I think that StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value) configures parameters in this way. It should be separately explained, which parameters are not effective if configured in this way, otherwise, Some parameters configured in this way will not take effect, which will cause confusion for users.

2. In the official document, I think it is necessary to add instructions on how to configure these parameters. For example, it can be configured not only in flink-conf.yaml, but also in the running command through flink run -yD, or whether there are other The parameters can be configured in the mode.

3. Questions about StreamExecutionEnvironment.getConfiguration() being protected. Will the community develop in later versions? Is there any effective way for users to set some parameters in the api and make them effective, such as configuring the taskmanager.memory.managed.fraction parameter.

Regarding some of the above issues, and why the parameter setting will not take effec. Maybe I did not describe it clearly enough, or because I did not understand the problem clearly, I hope to get a reply and discuss from the community.

Best,
Jason


Reply | Threaded
Open this post in threaded view
|

Re: Flink parameter configuration does not take effect

rmetzger0
Hi Jason,

I hope you don't mind that I brought back the conversation to the user@ mailing list, so that others can benefit from the information as well.

Thanks a lot for sharing your use case. I personally believe that Flink should support invocations like "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar ./config.json". There is no fundamental reason why this can not be supported.

The Javadoc about tableEnv.getConfig() mentions that the config is only about the "runtime behavior":
... but I see how this is not clearly defined.

As a short-term fix, I've proposed to clarify in the configuration table which options are cluster vs job configurations: https://issues.apache.org/jira/browse/FLINK-22257.

But in the long term, we certainly need to improve the user experience.


On Wed, Jun 16, 2021 at 3:31 PM Jason Lee <[hidden email]> wrote:
Dear Robert,

For tasks running on the cluster, some parameter configurations are global, but some parameter configurations need to be customized, such as some memory settings of TaskManager. For tasks with different state sizes, we need to configure different parameters. These parameters should not  be configured in flink-config.yaml. But for the current Flink, these parameters cannot be configured through StreamExecutionEnvironment, and some parameters are not effective if set through StreamTableEnvironment.

At the same time, Configuration is immutable after the task is started, which is correct, but I think some global parameters should also be specified in StreamExecutionEnvironment. At present, some parameters of checkpoint are also set globally, but they can be set through "StreamExecutionEnvironment .getCheckpointConfig().set()", then why can't the parameters of TaskManager's memory be set in this way? I think that setting the global parameters by "flink run -yD" is the same as setting by "StreamExecutionEnvironment". I am not sure if I understand it correctly.

I agree with you. I think we need to specify in the configuration of the official document that those parameters are best configured in flink-config.yaml. Those parameters can be modified in "StreamExecutionEnvironment", and those can only be passed through others Modified in the way. I think the document will be clearer for users.

Best,
Jason

On 06/16/2021 21:04[hidden email] wrote:
Dear Robert,

Thanks for your answer

Our Flink SQL task is deployed through Per job.

We provide our users with a platform for developing Flink SQL tasks. We will write the user's SQL code and configuration parameters into a Config.json file. At the same time, we develop a Flink Jar task at the bottom to actually execute the user's SQL through the command line. To perform this task, for example, the following is our instruction to start a Flink SQL task: "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar ./config.json". In order to facilitate the user's personalized configuration parameters, we want to set user configuration parameters in the execution environment of the FlinkStreamSQLDDLJob class that we have implemented, such as the "taskmanager.memory.managed.fraction" parameter, but it is currently impossible to configure through the Flink execution environment These parameters, because they are not effective, can only be configured by flink run -yD.

I think the configuration in the official document states that those parameters cannot be set through "StreamTableEnvironment.getConfig.getConfiguration().set()", but can only be set through flink run -yD or configured in flink-conf.yaml. If the current document does not explain it, it will not take effect if you use the "StreamTableEnvironment.getConfig.getConfiguration().set()" method to set some parameters. In order to increase the use of personalized configuration parameters for users, I think these instructions can appear in the Configuration of the official document.

Best,
Jason


On 06/16/2021 18:37[hidden email] wrote:
Hi Jason,

How are you deploying your Flink SQL tasks? (are you using per-job/application clusters, or a session cluster? )

I agree that the configuration management is not optimal in Flink. By default, I would recommend assuming that all configuration parameters are cluster settings, which require a cluster restart. Very few options (mostly those listed in the "Execution" section) are job settings, which can be set for each job.

Would it help if the table of configuration options in the documentation would tag the configuration option (with "Cluster" and "Job" option types?)?
Secondly, the API should probably only expose an immutable Configuration object, if the configuration is effectively immutable. I believe the option to set configuration on the (Stream)(Table)Environment is mostly there for local execution of Flink.


2. I agree, the docs are incomplete here (probably another symptom of the fact that the whole configuration management in Flink is not optimal). I see what I can do to improve the situation.

3. Except for local execution (everything runs in one JVM), I don't think we'll add support for this anytime soon. Some of the cluster configuration parameters just have to be global (like memory management), as they apply to all jobs executed on a cluster.


This ticket could be related to your problems: https://issues.apache.org/jira/browse/FLINK-21065

Let us know how you are deploying your Flink jobs, this will shed some more light on the discussion!

Best,
Robert


On Wed, Jun 16, 2021 at 4:27 AM Jason Lee <[hidden email]> wrote:
Hi  everyone,

When I was researching and using Flink recently, I found that the official documentation on how to configure parameters is confusing, and when I set the parameters in some ways, it does not take effect. mainly as follows:

we usually use a DDL Jar package to execute Flink SQL tasks, but we found that some parameters are set by StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value). These parameters cannot take effect. For example, taskmanager.memory.managed.fraction cannot take effect if the parameter is set in the above way (the Note in TableConfig in the source code is as follows: Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment. ). And StreamExecutionEnvironment.getConfiguration() is protected, which leads to some parameters that cannot be set through the api. I feel that this is not reasonable. Because sometimes, we want to configure different parameters for different tasks in the form of Configuration.setxxx(key, value) in the api, instead of just configuring parameters through flink run -yD or flink-conf.yaml.

In the Configuration module of the official document, the description and default value of each parameter are introduced. There is no relevant introduction about the parameter setting method in the official document Configuration module. I think this is not friendly enough for users, especially users who want to personalize some parameters. I feel that this method can be described in the official document.

In summary, for some normal tasks we can use the default parameter configuration, but for some tasks that require personalized configuration, especially Flink SQL tasks, I have a few suggestions on the use of configuration:

1. Regarding the api, I think that StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value) configures parameters in this way. It should be separately explained, which parameters are not effective if configured in this way, otherwise, Some parameters configured in this way will not take effect, which will cause confusion for users.

2. In the official document, I think it is necessary to add instructions on how to configure these parameters. For example, it can be configured not only in flink-conf.yaml, but also in the running command through flink run -yD, or whether there are other The parameters can be configured in the mode.

3. Questions about StreamExecutionEnvironment.getConfiguration() being protected. Will the community develop in later versions? Is there any effective way for users to set some parameters in the api and make them effective, such as configuring the taskmanager.memory.managed.fraction parameter.

Regarding some of the above issues, and why the parameter setting will not take effec. Maybe I did not describe it clearly enough, or because I did not understand the problem clearly, I hope to get a reply and discuss from the community.

Best,
Jason


Reply | Threaded
Open this post in threaded view
|

Re: Flink parameter configuration does not take effect

Jason Lee
Hi Robert,

Thank you for your enthusiastic answer

I have understood the current problem and look forward to a good solution and optimization by the community. I will continue to pay attention to changes in the community.

Best,
Jason


On 06/17/2021 15:45[hidden email] wrote:
Hi Jason,

I hope you don't mind that I brought back the conversation to the user@ mailing list, so that others can benefit from the information as well.

Thanks a lot for sharing your use case. I personally believe that Flink should support invocations like "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar ./config.json". There is no fundamental reason why this can not be supported.

The Javadoc about tableEnv.getConfig() mentions that the config is only about the "runtime behavior":
... but I see how this is not clearly defined.

As a short-term fix, I've proposed to clarify in the configuration table which options are cluster vs job configurations: https://issues.apache.org/jira/browse/FLINK-22257.

But in the long term, we certainly need to improve the user experience.


On Wed, Jun 16, 2021 at 3:31 PM Jason Lee <[hidden email]> wrote:
Dear Robert,

For tasks running on the cluster, some parameter configurations are global, but some parameter configurations need to be customized, such as some memory settings of TaskManager. For tasks with different state sizes, we need to configure different parameters. These parameters should not  be configured in flink-config.yaml. But for the current Flink, these parameters cannot be configured through StreamExecutionEnvironment, and some parameters are not effective if set through StreamTableEnvironment.

At the same time, Configuration is immutable after the task is started, which is correct, but I think some global parameters should also be specified in StreamExecutionEnvironment. At present, some parameters of checkpoint are also set globally, but they can be set through "StreamExecutionEnvironment .getCheckpointConfig().set()", then why can't the parameters of TaskManager's memory be set in this way? I think that setting the global parameters by "flink run -yD" is the same as setting by "StreamExecutionEnvironment". I am not sure if I understand it correctly.

I agree with you. I think we need to specify in the configuration of the official document that those parameters are best configured in flink-config.yaml. Those parameters can be modified in "StreamExecutionEnvironment", and those can only be passed through others Modified in the way. I think the document will be clearer for users.

Best,
Jason

On 06/16/2021 21:04[hidden email] wrote:
Dear Robert,

Thanks for your answer

Our Flink SQL task is deployed through Per job.

We provide our users with a platform for developing Flink SQL tasks. We will write the user's SQL code and configuration parameters into a Config.json file. At the same time, we develop a Flink Jar task at the bottom to actually execute the user's SQL through the command line. To perform this task, for example, the following is our instruction to start a Flink SQL task: "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar ./config.json". In order to facilitate the user's personalized configuration parameters, we want to set user configuration parameters in the execution environment of the FlinkStreamSQLDDLJob class that we have implemented, such as the "taskmanager.memory.managed.fraction" parameter, but it is currently impossible to configure through the Flink execution environment These parameters, because they are not effective, can only be configured by flink run -yD.

I think the configuration in the official document states that those parameters cannot be set through "StreamTableEnvironment.getConfig.getConfiguration().set()", but can only be set through flink run -yD or configured in flink-conf.yaml. If the current document does not explain it, it will not take effect if you use the "StreamTableEnvironment.getConfig.getConfiguration().set()" method to set some parameters. In order to increase the use of personalized configuration parameters for users, I think these instructions can appear in the Configuration of the official document.

Best,
Jason


On 06/16/2021 18:37[hidden email] wrote:
Hi Jason,

How are you deploying your Flink SQL tasks? (are you using per-job/application clusters, or a session cluster? )

I agree that the configuration management is not optimal in Flink. By default, I would recommend assuming that all configuration parameters are cluster settings, which require a cluster restart. Very few options (mostly those listed in the "Execution" section) are job settings, which can be set for each job.

Would it help if the table of configuration options in the documentation would tag the configuration option (with "Cluster" and "Job" option types?)?
Secondly, the API should probably only expose an immutable Configuration object, if the configuration is effectively immutable. I believe the option to set configuration on the (Stream)(Table)Environment is mostly there for local execution of Flink.


2. I agree, the docs are incomplete here (probably another symptom of the fact that the whole configuration management in Flink is not optimal). I see what I can do to improve the situation.

3. Except for local execution (everything runs in one JVM), I don't think we'll add support for this anytime soon. Some of the cluster configuration parameters just have to be global (like memory management), as they apply to all jobs executed on a cluster.


This ticket could be related to your problems: https://issues.apache.org/jira/browse/FLINK-21065

Let us know how you are deploying your Flink jobs, this will shed some more light on the discussion!

Best,
Robert


On Wed, Jun 16, 2021 at 4:27 AM Jason Lee <[hidden email]> wrote:
Hi  everyone,

When I was researching and using Flink recently, I found that the official documentation on how to configure parameters is confusing, and when I set the parameters in some ways, it does not take effect. mainly as follows:

we usually use a DDL Jar package to execute Flink SQL tasks, but we found that some parameters are set by StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value). These parameters cannot take effect. For example, taskmanager.memory.managed.fraction cannot take effect if the parameter is set in the above way (the Note in TableConfig in the source code is as follows: Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment. ). And StreamExecutionEnvironment.getConfiguration() is protected, which leads to some parameters that cannot be set through the api. I feel that this is not reasonable. Because sometimes, we want to configure different parameters for different tasks in the form of Configuration.setxxx(key, value) in the api, instead of just configuring parameters through flink run -yD or flink-conf.yaml.

In the Configuration module of the official document, the description and default value of each parameter are introduced. There is no relevant introduction about the parameter setting method in the official document Configuration module. I think this is not friendly enough for users, especially users who want to personalize some parameters. I feel that this method can be described in the official document.

In summary, for some normal tasks we can use the default parameter configuration, but for some tasks that require personalized configuration, especially Flink SQL tasks, I have a few suggestions on the use of configuration:

1. Regarding the api, I think that StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value) configures parameters in this way. It should be separately explained, which parameters are not effective if configured in this way, otherwise, Some parameters configured in this way will not take effect, which will cause confusion for users.

2. In the official document, I think it is necessary to add instructions on how to configure these parameters. For example, it can be configured not only in flink-conf.yaml, but also in the running command through flink run -yD, or whether there are other The parameters can be configured in the mode.

3. Questions about StreamExecutionEnvironment.getConfiguration() being protected. Will the community develop in later versions? Is there any effective way for users to set some parameters in the api and make them effective, such as configuring the taskmanager.memory.managed.fraction parameter.

Regarding some of the above issues, and why the parameter setting will not take effec. Maybe I did not describe it clearly enough, or because I did not understand the problem clearly, I hope to get a reply and discuss from the community.

Best,
Jason