When I was researching and using Flink recently, I found that the official documentation on how to configure parameters is confusing, and when I set the parameters in some ways, it does not take effect. mainly as follows:
we usually use a DDL Jar package to execute Flink SQL tasks, but we found that some parameters are set by StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value). These parameters cannot take effect. For example, taskmanager.memory.managed.fraction cannot take effect if the parameter is set in the above way (the Note in TableConfig in the source code is as follows: Because options are read at different point in time when performing operations, it is recommended to set configuration options early after instantiating a table environment. ). And StreamExecutionEnvironment.getConfiguration() is protected, which leads to some parameters that cannot be set through the api. I feel that this is not reasonable. Because sometimes, we want to configure different parameters for different tasks in the form of Configuration.setxxx(key, value) in the api, instead of just configuring parameters through flink run -yD or flink-conf.yaml.
In the Configuration module of the official document, the description and default value of each parameter are introduced. There is no relevant introduction about the parameter setting method in the official document Configuration module. I think this is not friendly enough for users, especially users who want to personalize some parameters. I feel that this method can be described in the official document.
In summary, for some normal tasks we can use the default parameter configuration, but for some tasks that require personalized configuration, especially Flink SQL tasks, I have a few suggestions on the use of configuration:
1. Regarding the api, I think that StreamTableEnvironment.getConfig().getConfiguration().setXXX(key, value) configures parameters in this way. It should be separately explained, which parameters are not effective if configured in this way, otherwise, Some parameters configured in this way will not take effect, which will cause confusion for users.
2. In the official document, I think it is necessary to add instructions on how to configure these parameters. For example, it can be configured not only in flink-conf.yaml, but also in the running command through flink run -yD, or whether there are other The parameters can be configured in the mode.
3. Questions about StreamExecutionEnvironment.getConfiguration() being protected. Will the community develop in later versions? Is there any effective way for users to set some parameters in the api and make them effective, such as configuring the taskmanager.memory.managed.fraction parameter.
Regarding some of the above issues, and why the parameter setting will not take effec. Maybe I did not describe it clearly enough, or because I did not understand the problem clearly, I hope to get a reply and discuss from the community.
How are you deploying your Flink SQL tasks? (are you using per-job/application clusters, or a session cluster? )
I agree that the configuration management is not optimal in Flink. By default, I would recommend assuming that all configuration parameters are cluster settings, which require a cluster restart. Very few options (mostly those listed in the "Execution" section) are job settings, which can be set for each job.
Would it help if the table of configuration options in the documentation would tag the configuration option (with "Cluster" and "Job" option types?)?
Secondly, the API should probably only expose an immutable Configuration object, if the configuration is effectively immutable. I believe the option to set configuration on the (Stream)(Table)Environment is mostly there for local execution of Flink.
2. I agree, the docs are incomplete here (probably another symptom of the fact that the whole configuration management in Flink is not optimal). I see what I can do to improve the situation.
3. Except for local execution (everything runs in one JVM), I don't think we'll add support for this anytime soon. Some of the cluster configuration parameters just have to be global (like memory management), as they apply to all jobs executed on a cluster.
Let us know how you are deploying your Flink jobs, this will shed some more light on the discussion!
I hope you don't mind that I brought back the conversation to the user@ mailing list, so that others can benefit from the information as well.
Thanks a lot for sharing your use case. I personally believe that Flink should support invocations like "flink run -m yarn-cluster xxx.FlinkStreamSQLDDLJob flink-stream-sql-ddl-1.0.0.jar ./config.json". There is no fundamental reason why this can not be supported.
The Javadoc about tableEnv.getConfig() mentions that the config is only about the "runtime behavior":
... but I see how this is not clearly defined.
As a short-term fix, I've proposed to clarify in the configuration table which options are cluster vs job configurations: https://issues.apache.org/jira/browse/FLINK-22257.
But in the long term, we certainly need to improve the user experience.
On Wed, Jun 16, 2021 at 3:31 PM Jason Lee <[hidden email]> wrote:
Thank you for your enthusiastic answer
I have understood the current problem and look forward to a good solution and optimization by the community. I will continue to pay attention to changes in the community.
|Free forum by Nabble
|Edit this page