Hi: I have a single EMR cluster with Flink and want to run multiple applications on it with different flink configurations. Is there a way to 1. Pass the config file name for each application, or 2. Overwrite the config parameters via command line arguments for the application. This is similar to how we can overwrite the default parameters in spark I searched the documents and have tried using ParameterTool with the config parameter names, but it has not worked as yet. Thanks for your help. Mans |
Hi Singh, You can use the environment variable "FLINK_CONF_DIR" to specify path to the directory of config files. You can also override config options with command line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink run' command). Thank you~ Xintong Song On Wed, Jun 26, 2019 at 9:13 PM M Singh <[hidden email]> wrote:
|
Hi Xintong: Thanks for your pointers. I tried -Dparallelism.default=2 locally (in IDE) and it did not work. Do you know if there is a common way that would work both for emr, locally and ide ? Thanks again.
On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song <[hidden email]> wrote:
Hi Singh, You can use the environment variable "FLINK_CONF_DIR" to specify path to the directory of config files. You can also override config options with command line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink run' command). Thank you~ Xintong Song On Wed, Jun 26, 2019 at 9:13 PM M Singh <[hidden email]> wrote:
|
Hi, Singh, I don't think that should work. The -D or -yD parameters needs to be passed to the Flink start-up scripts or the "flink run" command. I don't think the IntelliJ VM arguments are equivalent to that. In fact, I'm not aware of any method to set "-D" parameters when running jobs IDE. Thank you~ Xintong Song On Fri, Jun 28, 2019 at 8:45 PM M Singh <[hidden email]> wrote:
|
This is due to flink doesn't unify the execution in different enviroments. The community has discuss it before about how to enhance the flink client api. The initial proposal is to introduce FlinkConf which contains all the configuration so that we can unify the executions in all environments (IDE, CLI, SQL Client, Scala Shell, downstream project) Here's the sample code: val conf = new FlinkConf().setProperty(“key_1”, “value_1”) // create FlinkConf val env = new ExecutionEnvironment(conf) // create ExecutionEnvironment val jobId = env.submit(...) // non-blocking job submission (detached mode) val jobStatus = env.getClusterClient().queryJobStatus(jobId) // approach 1: query job status via ClusterClient val jobStatus = env.queryJobStatus(jobId) // approach 2: query job status via ExecutionEnvironment. And you can refer this for more details: Xintong Song <[hidden email]> 于2019年6月28日周五 下午10:28写道:
Best Regards
Jeff Zhang |
Thanks Jeff and Xintong for your pointers.
On Friday, June 28, 2019, 10:44:35 AM EDT, Jeff Zhang <[hidden email]> wrote:
This is due to flink doesn't unify the execution in different enviroments. The community has discuss it before about how to enhance the flink client api. The initial proposal is to introduce FlinkConf which contains all the configuration so that we can unify the executions in all environments (IDE, CLI, SQL Client, Scala Shell, downstream project) Here's the sample code: val conf = new FlinkConf().setProperty(“key_1”, “value_1”) // create FlinkConf val env = new ExecutionEnvironment(conf) // create ExecutionEnvironment val jobId = env.submit(...) // non-blocking job submission (detached mode) val jobStatus = env.getClusterClient().queryJobStatus(jobId) // approach 1: query job status via ClusterClient val jobStatus = env.queryJobStatus(jobId) // approach 2: query job status via ExecutionEnvironment. And you can refer this for more details: Xintong Song <[hidden email]> 于2019年6月28日周五 下午10:28写道:
Best Regards Jeff Zhang |
Free forum by Nabble | Edit this page |