I have a streaming job that works in standalone cluster. Flink version is 1.4.1. Everything was working so far. But since I added new treatments, I can not start my job anymore. I have this exception :
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 ms
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:524)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:103)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:77)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:402)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 ms
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:437)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:516)
... 11 more
Caused by: java.util.concurrent.TimeoutException
at java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:435)
... 12 more
I see a very strange behavior. When I comment on a function (any one, for example a FilterFunction, which was present before or after my modification).
I tried to change the configuration (akka.client.timeout and akka.framesize) without success.
This is my flink-conf.yaml
jobmanager.rpc.address: myhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 128
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 100
taskmanager.memory.preallocate: false
taskmanager.data.port: 6121
parallelism.default: 1
taskmanager.tmp.dirs: /dohdev/flink/tmp/tskmgr
blob.storage.directory: /dohdev/flink/tmp/blob
jobmanager.web.port: -1
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /dohdev/flink
high-availability.cluster-id: dev
high-availability.storageDir:
file:////mnt/metaflink high-availability.zookeeper.storageDir: /mnt/metaflink/inh/agregateur/recovery
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 1000
restart-strategy.fixed-delay.delay: 5 s
zookeeper.sasl.disable: true
blob.service.cleanup.interval: 60
And I launch a job with this command : bin/flink run -d myjar.jar
I added as an attachment a graph of my job when it works (Graph.PNG).
Do you have an idea of the problem ?
Thanks.
Julien
<Graph.PNG>