This post was updated on .
Hello Flinkers,
I have deployed Flink in a cluster of 17 nodes, each having 8 CPUs. Thus, in total there are 136 CPUs available. I have set the parameter askmanager.numberOfTaskSlots = 8 in all machines, since they have 8 CPUs. And when I am going to run ./flink run -c classpath jarFile -p 136 and I get error. I can only put it maximum 8 which is reasonable from one point. But here [1] it says the following : parallelism.default: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program’s execution. Note: The default parallelism can be overwritten for an entire job by calling setParallelism(int parallelism) on the ExecutionEnvironment or by passing -p <parallelism> to the Flink Command-line frontend. It can be overwritten for single transformations by calling setParallelism(int parallelism) on an operator. See Parallel Execution for more information about parallelism. So...specially the part : setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program’s execution. So, for me NumTaskManagers * NumSlotsPerTaskManager = 17 * 8 = 136. Right? Any idea why this does not work? Best, Max [1] -- https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Go to the web UI and verify all 136 TaskManagers are visible in the machine you are submitting the job from. I have encountered issues where not all TaskManagers start, or you may not have all 17 configured properly to be one cluster vs 17 clusters of 8.
Michael > On Apr 26, 2018, at 10:48 AM, m@xi <[hidden email]> wrote: > > Hello Flinkers, > > I have deployed Flink in a cluster of 17 nodes, each having 8 CPUs. Thus, in > total there are 136 CPUs available. > > I have set the parameter askmanager.numberOfTaskSlots = 8 in all machines, > since they have 8 CPUs. > > And when I am going to run ./flink run -c classpath jarFile -p 136 and I get > error. > > I can only put it maximum 8 which is reasonable from one point. But here [1] > it says the following : > > parallelism.default: The default parallelism to use for programs that have > no parallelism specified. (DEFAULT: 1). For setups that have no concurrent > jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager > will cause the system to use all available execution resources for the > program’s execution. Note: The default parallelism can be overwritten for an > entire job by calling setParallelism(int parallelism) on the > ExecutionEnvironment or by passing -p <parallelism> to the Flink > Command-line frontend. It can be overwritten for single transformations by > calling setParallelism(int parallelism) on an operator. See Parallel > Execution for more information about parallelism. > > So...specially the part : setting this value to NumTaskManagers * > NumSlotsPerTaskManager will cause the system to use all available execution > resources for the program’s execution. > > So, for me NumTaskManagers * NumSlotsPerTaskManager = 17 * 8 = 136. Right? > Any idea why this does not work? > > Best, > Max > > [1] -- > https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
No man. I have 17 TaskManagers and each has a number of 8 slots.
Do you think it is better to have 8 TaskManager (1 slot each) ? Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
You need to verify your configs are correct. Check that the local machine sees all the task managers, that is the most likely reason it will reject a higher parallelism. I use a java program to submit to a 3 node 18 slot cluster without issue on a job with 18 parallelism. I have not used the command line to do this however.
Michael > On Apr 26, 2018, at 11:16 AM, m@xi <[hidden email]> wrote: > > No man. I have 17 TaskManagers and each has a number of 8 slots. > > Do you think it is better to have 8 TaskManager (1 slot each) ? > > Best, > Max > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Check that you have slaves and masters set correctly on all machines, and in particular the one submitting jobs. Make sure that from the machine submitting the job that it is talking to the correct job manager (jobmanager.rpc.address). It really sounds like you are some how submitting jobs to only one taskmanager.
You should also use jps to verify that you only have one jobmanager running and the worker machines only have taskmanager running.
Michael
|
OK Michael!
I will look into it and will come back at you! Thanks for the help. I agree that it is quite suspicious the par = 8 Jps? Meaning? Oh I should mention that the JobManager node is also a TaskManager. Best, Max
|
On Thu, Apr 26, 2018 at 10:47 AM, Makis Pap <[hidden email]> wrote:
jps is a tool that comes with JDK (see $JAVA_HOME/bin). This is modeled after the POSIX command ps (process status); jps = java ps => it shows you the JVM's running on the given computer.
|
In reply to this post by m@xi
For a cluster that size, having the job manager also be a task manager is not recommended.
Michael
|
Hi Michael!
Seems that you were correct. It is weird that I could not set parallelism = 136. I cannot configure the cluster properly so far. I do everything as it is described here [1]. It seems that the JobManager is not reachable. Best, Max [1] -- https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/cluster_setup.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
The TaskManager cannot reach the JobManager. I get this error. Any ideas?
Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Guys seriously I have done the process as described in the documentation of
the standalone cluster 20 times. After I start the cluster with ./start-cluster.sh, I normally see with jps the JobManager process running in the master and the TaskManager processes running in slaves. Although every time I am trying to run a simple Flink job I get the following : Submitting job with JobID: fb7e19d45ec5e6572dff6e33f4c3aad4. Waiting for job completion. ------------------------------------------------------------ The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streamjoin.srhc.JobSRHC.main(JobSRHC.java:140) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't retrieve the JobExecutionResult from the JobManager. at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:300) at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481) ... 18 more Caused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:219) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:104) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:71) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Weeeeell you are my last resort I guess! haha Please help if you have any ideas. Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi, did you try to increase the Akka timeout [1]?[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/config.html#distributed-coordination-via-akka 2018-04-29 19:44 GMT+02:00 m@xi <[hidden email]>: Guys seriously I have done the process as described in the documentation of |
Hello Fabian!
Thanks for the answer. No I did not. Is this a requirement? Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
It's not a requirement but the exception reads "org.apache.flink.runtime. So increasing the timeout might help.2018-05-02 12:20 GMT+02:00 m@xi <[hidden email]>: Hello Fabian! |
Hey Fabian!
Sorry for being unaware regarding Flink configurations, but for me I have followed every step but still setting a simple cluster of 2 nodes proved to be a pain in the as@@#. So, to which value you think I should set the akka timeout? Also, in my head the process is the following : Set up the cluster and then transfer the fat jar and your data to the master and there run the job. The data are forwarded to the slaves which are computing the job. Is this correct? Best, Max -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |