Hello, I'm having issues editing the default Flink memory settings. I'm attempting to run a Flink task on a cluster at scale. The log shows my edited config settings having been read into the program, but they're having no effect. Here's the trace: 17/06/05 23:45:41 INFO flink.FlinkRunner: Starting execution of Flink program. 17/06/05 23:45:41 INFO java.ExecutionEnvironment: The job has 0 registered types and 0 default Kryo serializers 17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.size, 100 17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.heap.mb, 256 17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.debug.memory.startLogThread, true 17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Disabled queryable state server 17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Starting FlinkMiniCluster. . . . 17/06/05 23:45:41 INFO network.NetworkEnvironment: Starting the network environment and its components. 17/06/05 23:45:41 INFO taskmanager.TaskManager: Limiting managed memory to 17592186044406 MB, memory will be allocated lazily. 17/06/05 23:45:41 ERROR flink.FlinkRunner: Pipeline execution failed java.lang.IllegalArgumentException: Size of total memory must be positive. And here's the config file I'm using: taskmanager.memory.size: 100 taskmanager.heap.mb: 256 taskmanager.debug.memory.startLogThread: true In the shell script I'm using to run the task, I've edited the FLINK_CONF_DIR to direct to the config file that I created. If anyone has any advice or inputs it would be much appreciated. Thanks! Will Walters. |
Hi Will,
How are you starting your cluster/executing your program? From what I can gather you are using Beam, is that right? The line about the FlinkMiniCluster seems strange because this would hint at the fact that the Runner is trying to execute in local mode. Best, Aljoscha
|
Aljoscha, You're correct that I'm using Beam. Here's the shell script I'm using to start the job: hadoop jar **path to jar file** org.apache.beam.examples.complete.TfIdf \ --runner=FlinkRunner \ --input= **path to directory** \ --output=tfIdf There are two lines before that to set the Flink config path, as well as the Hadoop classpath. Thank you, Will. On Tuesday, June 6, 2017 5:45 AM, Aljoscha Krettek <[hidden email]> wrote: Hi Will, How are you starting your cluster/executing your program? From what I can gather you are using Beam, is that right? The line about the FlinkMiniCluster seems strange because this would hint at the fact that the Runner is trying to execute in local mode. Best, Aljoscha
|
Hi Will,
I’m afraid that will simply run your program in Flink local mode, where the configuration settings are ignored because an in-process Flink Cluster is being started. For running a Beam pipeline on YARN you have two options right now: 1. Start a Flink YARN session as described here https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session and submit your Beam job as described here https://beam.apache.org/get-started/quickstart-java/#run-wordcount once the YARN session is up and has printed the JobManager details that you need to submit the Beam job. 2. Package your program as a fat jar. The quickstart from the Beam website should have the rights settings in the POM for doing that. Then use regular Flink submission to submit that jar to a YARN cluster as described here https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#run-a-single-flink-job-on-yarn Best, Aljoscha
|
Free forum by Nabble | Edit this page |