Issue With Configuration File

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue With Configuration File

Will Walters
Hello,

I'm having issues editing the default Flink memory settings. I'm attempting to run a Flink task on a cluster at scale. The log shows my edited config settings having been read into the program, but they're having no effect. Here's the trace:

17/06/05 23:45:41 INFO flink.FlinkRunner: Starting execution of Flink program.
17/06/05 23:45:41 INFO java.ExecutionEnvironment: The job has 0 registered types and 0 default Kryo serializers
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.size, 100
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.heap.mb, 256
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.debug.memory.startLogThread, true
17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Disabled queryable state server
17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Starting FlinkMiniCluster.
.
.
.
17/06/05 23:45:41 INFO network.NetworkEnvironment: Starting the network environment and its components.
17/06/05 23:45:41 INFO taskmanager.TaskManager: Limiting managed memory to 17592186044406 MB, memory will be allocated lazily.
17/06/05 23:45:41 ERROR flink.FlinkRunner: Pipeline execution failed
java.lang.IllegalArgumentException: Size of total memory must be positive.

And here's the config file I'm using:

taskmanager.memory.size: 100
taskmanager.heap.mb: 256
taskmanager.debug.memory.startLogThread: true

In the shell script I'm using to run the task, I've edited the FLINK_CONF_DIR to direct to the config file that I created.

If anyone has any advice or inputs it would be much appreciated.

Thanks!
Will Walters.
Reply | Threaded
Open this post in threaded view
|

Re: Issue With Configuration File

Aljoscha Krettek
Hi Will,

How are you starting your cluster/executing your program? From what I can gather you are using Beam, is that right? The line about the FlinkMiniCluster seems strange because this would hint at the fact that the Runner is trying to execute in local mode.

Best,
Aljoscha

On 6. Jun 2017, at 02:13, Will Walters <[hidden email]> wrote:

Hello,

I'm having issues editing the default Flink memory settings. I'm attempting to run a Flink task on a cluster at scale. The log shows my edited config settings having been read into the program, but they're having no effect. Here's the trace:

17/06/05 23:45:41 INFO flink.FlinkRunner: Starting execution of Flink program.
17/06/05 23:45:41 INFO java.ExecutionEnvironment: The job has 0 registered types and 0 default Kryo serializers
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.size, 100
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.heap.mb, 256
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.debug.memory.startLogThread, true
17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Disabled queryable state server
17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Starting FlinkMiniCluster.
.
.
.
17/06/05 23:45:41 INFO network.NetworkEnvironment: Starting the network environment and its components.
17/06/05 23:45:41 INFO taskmanager.TaskManager: Limiting managed memory to 17592186044406 MB, memory will be allocated lazily.
17/06/05 23:45:41 ERROR flink.FlinkRunner: Pipeline execution failed
java.lang.IllegalArgumentException: Size of total memory must be positive.

And here's the config file I'm using:

taskmanager.memory.size: 100
taskmanager.heap.mb: 256
taskmanager.debug.memory.startLogThread: true

In the shell script I'm using to run the task, I've edited the FLINK_CONF_DIR to direct to the config file that I created.

If anyone has any advice or inputs it would be much appreciated.

Thanks!
Will Walters.

Reply | Threaded
Open this post in threaded view
|

Re: Issue With Configuration File

Will Walters
Aljoscha,

You're correct that I'm using Beam. Here's the shell script I'm using to start the job:

hadoop jar **path to jar file** org.apache.beam.examples.complete.TfIdf \
--runner=FlinkRunner \
--input= **path to directory** \
--output=tfIdf

There are two lines before that to set the Flink config path, as well as the Hadoop classpath.

Thank you,
Will.


On Tuesday, June 6, 2017 5:45 AM, Aljoscha Krettek <[hidden email]> wrote:


Hi Will,

How are you starting your cluster/executing your program? From what I can gather you are using Beam, is that right? The line about the FlinkMiniCluster seems strange because this would hint at the fact that the Runner is trying to execute in local mode.

Best,
Aljoscha

On 6. Jun 2017, at 02:13, Will Walters <[hidden email]> wrote:

Hello,

I'm having issues editing the default Flink memory settings. I'm attempting to run a Flink task on a cluster at scale. The log shows my edited config settings having been read into the program, but they're having no effect. Here's the trace:

17/06/05 23:45:41 INFO flink.FlinkRunner: Starting execution of Flink program.
17/06/05 23:45:41 INFO java.ExecutionEnvironment: The job has 0 registered types and 0 default Kryo serializers
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.size, 100
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.heap.mb, 256
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.debug.memory.startLogThread, true
17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Disabled queryable state server
17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Starting FlinkMiniCluster.
.
.
.
17/06/05 23:45:41 INFO network.NetworkEnvironment: Starting the network environment and its components.
17/06/05 23:45:41 INFO taskmanager.TaskManager: Limiting managed memory to 17592186044406 MB, memory will be allocated lazily.
17/06/05 23:45:41 ERROR flink.FlinkRunner: Pipeline execution failed
java.lang.IllegalArgumentException: Size of total memory must be positive.

And here's the config file I'm using:

taskmanager.memory.size: 100
taskmanager.heap.mb: 256
taskmanager.debug.memory.startLogThread: true

In the shell script I'm using to run the task, I've edited the FLINK_CONF_DIR to direct to the config file that I created.

If anyone has any advice or inputs it would be much appreciated.

Thanks!
Will Walters.



Reply | Threaded
Open this post in threaded view
|

Re: Issue With Configuration File

Aljoscha Krettek
Hi Will,

I’m afraid that will simply run your program in Flink local mode, where the configuration settings are ignored because an in-process Flink Cluster is being started.

For running a Beam pipeline on YARN you have two options right now:

1. Start a Flink YARN session as described here https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session and submit your Beam job as described here https://beam.apache.org/get-started/quickstart-java/#run-wordcount once the YARN session is up and has printed the JobManager details that you need to submit the Beam job.

2. Package your program as a fat jar. The quickstart from the Beam website should have the rights settings in the POM for doing that. Then use regular Flink submission to submit that jar to a YARN cluster as described here https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#run-a-single-flink-job-on-yarn

Best,
Aljoscha

On 6. Jun 2017, at 19:05, Will Walters <[hidden email]> wrote:

Aljoscha,

You're correct that I'm using Beam. Here's the shell script I'm using to start the job:

hadoop jar **path to jar file** org.apache.beam.examples.complete.TfIdf \
--runner=FlinkRunner \
--input= **path to directory** \
--output=tfIdf

There are two lines before that to set the Flink config path, as well as the Hadoop classpath.

Thank you,
Will.


On Tuesday, June 6, 2017 5:45 AM, Aljoscha Krettek <[hidden email]> wrote:


Hi Will,

How are you starting your cluster/executing your program? From what I can gather you are using Beam, is that right? The line about the FlinkMiniCluster seems strange because this would hint at the fact that the Runner is trying to execute in local mode.

Best,
Aljoscha

On 6. Jun 2017, at 02:13, Will Walters <[hidden email]> wrote:

Hello,

I'm having issues editing the default Flink memory settings. I'm attempting to run a Flink task on a cluster at scale. The log shows my edited config settings having been read into the program, but they're having no effect. Here's the trace:

17/06/05 23:45:41 INFO flink.FlinkRunner: Starting execution of Flink program.
17/06/05 23:45:41 INFO java.ExecutionEnvironment: The job has 0 registered types and 0 default Kryo serializers
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.memory.size, 100
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.heap.mb, 256
17/06/05 23:45:41 INFO configuration.GlobalConfiguration: Loading configuration property: taskmanager.debug.memory.startLogThread, true
17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Disabled queryable state server
17/06/05 23:45:41 INFO minicluster.FlinkMiniCluster: Starting FlinkMiniCluster.
.
.
.
17/06/05 23:45:41 INFO network.NetworkEnvironment: Starting the network environment and its components.
17/06/05 23:45:41 INFO taskmanager.TaskManager: Limiting managed memory to 17592186044406 MB, memory will be allocated lazily.
17/06/05 23:45:41 ERROR flink.FlinkRunner: Pipeline execution failed
java.lang.IllegalArgumentException: Size of total memory must be positive.

And here's the config file I'm using:

taskmanager.memory.size: 100
taskmanager.heap.mb: 256
taskmanager.debug.memory.startLogThread: true

In the shell script I'm using to run the task, I've edited the FLINK_CONF_DIR to direct to the config file that I created.

If anyone has any advice or inputs it would be much appreciated.

Thanks!
Will Walters.