I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception :
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437) at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516) ... 11 more Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435) ... 12 more I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification). I tried to change the configuration (akka.client.timeout and akka.framesize) without success. This is my flink-conf.yaml jobmanager.rpc.address: myhost jobmanager.rpc.port: 6123 jobmanager.heap.mb: 128 taskmanager.heap.mb: 1024 taskmanager.numberOfTaskSlots: 100 taskmanager.memory.preallocate: false taskmanager.data.port: 6121 parallelism.default: 1 taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr blob.storage.directory: /dohdev/flink/tmp/blob jobmanager.web.port: -1 high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /dohdev/flink high-availability.cluster-id: dev high-availability.storageDir: file:////mnt/metaflink high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 1000 restart-strategy.fixed-delay.delay: 5 s zookeeper.sasl.disable: true blob.service.cleanup.interval: 60 And I launch a job with this command : bin/flink run -d myjar.jar I added as an attachment a graph of my job when it works (Graph.PNG). Do you have an idea of the problem ? Thanks. Julien Graph.PNG (109K) Download Attachment |
Hi,
You have quite complicated job graph and very low memory settings for the job manager and task manager. It might be that long GC pauses are causing this problem. Secondly, there are quite some results in google search of this error that points toward high-availability issues. Have you read those previously reported problems? Thanks, Piotrek
|
Hi Piotrek,
Thank you for your answer. Actually it was necessary to increase the memory of the JobManager (I had tested it but I had not restarted Flink ...). I will also work on optimization. I thought it was good practice to create as much function as possible based on their functional value (for example: create two FilterFunctions that have a different functional meaning). So I will try to have fewer functions (for example: gather my two FilterFunctions in one). Thanks again Piotrek ! Julien. ----- Mail original ----- De: "Piotr Nowojski" <[hidden email]> À: [hidden email] Cc: [hidden email] Envoyé: Mardi 9 Octobre 2018 10:37:58 Objet: Re: JobManager did not respond within 60000 ms Hi, You have quite complicated job graph and very low memory settings for the job manager and task manager. It might be that long GC pauses are causing this problem. Secondly, there are quite some results in google search of this error that points toward high-availability issues. Have you read those previously reported problems? Thanks, Piotrek On 9 Oct 2018, at 09:57, [hidden email] wrote: I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception : org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402) at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437) at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516) ... 11 more Caused by: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435) ... 12 more I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification). I tried to change the configuration (akka.client.timeout and akka.framesize) without success. This is my flink-conf.yaml jobmanager.rpc.address: myhost jobmanager.rpc.port: 6123 jobmanager.heap.mb: 128 taskmanager.heap.mb: 1024 taskmanager.numberOfTaskSlots: 100 taskmanager.memory.preallocate: false taskmanager.data.port: 6121 parallelism.default: 1 taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr blob.storage.directory: /dohdev/flink/tmp/blob jobmanager.web.port: -1 high-availability: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /dohdev/flink high-availability.cluster-id: dev high-availability.storageDir: file:////mnt/metaflink high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 1000 restart-strategy.fixed-delay.delay: 5 s zookeeper.sasl.disable: true blob.service.cleanup.interval: 60 And I launch a job with this command : bin/flink run -d myjar.jar I added as an attachment a graph of my job when it works (Graph.PNG). Do you have an idea of the problem ? Thanks. Julien <Graph.PNG> |
Hi again,
Glad that you solved your problem :) Splitting code into smaller functions has its advantages, but more operators/tasks means more overhead for JobManager/TaskManager to manage them. Usually that’s not a big issue, but as I said, you were running your cluster on extremely low memory settings. Piotrek > On 9 Oct 2018, at 18:09, [hidden email] wrote: > > Hi Piotrek, > > Thank you for your answer. Actually it was necessary to increase the memory of the JobManager (I had tested it but I had not restarted Flink ...). > > I will also work on optimization. I thought it was good practice to create as much function as possible based on their functional value (for example: create two FilterFunctions that have a different functional meaning). So I will try to have fewer functions (for example: gather my two FilterFunctions in one). > > Thanks again Piotrek ! > > Julien. > > ----- Mail original ----- > De: "Piotr Nowojski" <[hidden email]> > À: [hidden email] > Cc: [hidden email] > Envoyé: Mardi 9 Octobre 2018 10:37:58 > Objet: Re: JobManager did not respond within 60000 ms > > Hi, > > > You have quite complicated job graph and very low memory settings for the job manager and task manager. It might be that long GC pauses are causing this problem. > > > Secondly, there are quite some results in google search of this error that points toward high-availability issues. Have you read those previously reported problems? > > > Thanks, Piotrek > > > > > > On 9 Oct 2018, at 09:57, [hidden email] wrote: > > > I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception : > > org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms > at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524) > at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456) > at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282) > at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054) > at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101) > at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098) > at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098) > Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms > at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437) > at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516) > ... 11 more > Caused by: java.util.concurrent.TimeoutException > at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435) > ... 12 more > > I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification). > I tried to change the configuration (akka.client.timeout and akka.framesize) without success. > > This is my flink-conf.yaml > jobmanager.rpc.address: myhost > jobmanager.rpc.port: 6123 > jobmanager.heap.mb: 128 > taskmanager.heap.mb: 1024 > taskmanager.numberOfTaskSlots: 100 > taskmanager.memory.preallocate: false > taskmanager.data.port: 6121 > parallelism.default: 1 > taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr > blob.storage.directory: /dohdev/flink/tmp/blob > jobmanager.web.port: -1 > high-availability: zookeeper > high-availability.zookeeper.quorum: localhost:2181 > high-availability.zookeeper.path.root: /dohdev/flink > high-availability.cluster-id: dev > high-availability.storageDir: file:////mnt/metaflink > high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 1000 > restart-strategy.fixed-delay.delay: 5 s > zookeeper.sasl.disable: true > blob.service.cleanup.interval: 60 > > And I launch a job with this command : bin/flink run -d myjar.jar > > I added as an attachment a graph of my job when it works (Graph.PNG). > > Do you have an idea of the problem ? > > Thanks. > Julien > > > <Graph.PNG> |
Free forum by Nabble | Edit this page |