Hi
I am trying to build a cluster for flink with 1 master and 2 workers. The program is working fine locally. The messages are read from Kafka and just printed on STDOUT. The cluster is successfully created and UI is also shows all config. But the job fails to execute on the cluster. Here are few exceptions I see in the log files File : flink-root-standalonesession 2020-01-29 19:55:00,348 INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [20000 ms]. 2020-01-29 19:55:00,350 INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [20000 ms]. 2020-01-29 19:55:00,350 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]] Caused by: [No response from remote for outbound association. Associate timed out after [20000 ms].] 2020-01-29 19:55:00,350 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]] Caused by: [No response f rom remote for outbound association. Associate timed out after [20000 ms].] 2020-01-29 19:55:00,359 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:39493 2020-01-29 19:55:00,359 WARN akka.remote.transport.netty.NettyTransport - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:34094 2020-01-29 19:58:21,880 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler - Failed to transfer file from TaskExecutor a7abe6e294fa3ae4129fd695f7309a36. java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply. File : flink-root-client-ip 2020-01-29 19:48:10,566 WARN org.apache.flink.client.cli.CliFrontend - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli. java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:264) at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187) at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 5 more 2020-01-29 19:48:10,663 INFO org.apache.flink.core.fs.FileSystem - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available. 2020-01-29 19:48:10,856 INFO org.apache.flink.runtime.security.modules.HadoopModuleFactory - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath. 2020-01-29 19:48:10,874 INFO org.apache.flink.runtime.security.SecurityUtils - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath. 2020-01-29 19:48:10,875 INFO org.apache.flink.client.cli.CliFrontend - Running 'run' command. 2020-01-29 19:48:10,881 INFO org.apache.flink.client.cli.CliFrontend - Building program from JAR file 2020-01-29 19:48:10,965 INFO org.apache.flink.configuration.Configuration - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' 2020-01-29 19:48:11,160 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started. 2020-01-29 19:48:11,163 INFO org.apache.flink.client.cli.CliFrontend - Starting execution of program 2020-01-29 19:48:11,163 INFO org.apache.flink.client.program.rest.RestClusterClient - Starting program in interactive mode (detached: false) 2020-01-29 19:48:11,306 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, ip 2020-01-29 19:48:11,306 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123 2020-01-29 19:48:11,307 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m 2020-01-29 19:48:11,307 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m 2020-01-29 19:48:11,307 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 2 2020-01-29 19:48:11,307 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 4 2020-01-29 19:48:11,307 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.execution.failover-strategy, region 2020-01-29 19:48:11,307 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: io.tmp.dirs, /tmp/flink 2020-01-29 19:48:11,311 INFO org.apache.flink.client.program.rest.RestClusterClient - Submitting job 4f4cce35db3f37cae310f272ec88a303 (detached: false). 2020-01-29 20:05:13,170 INFO org.apache.flink.runtime.rest.RestClient - Shutting down rest endpoint. 2020-01-29 20:05:13,172 INFO org.apache.flink.runtime.rest.RestClient - Rest endpoint shutdown complete. 2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f4cce35db3f37cae310f272ec88a303) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083) at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259) ... 18 more Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149) at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) Flink version : flink-1.9.1 OS : CentOS Linux release 7.6.1810 (Core) Is this related to this issue : Can somebody throw some light on this ? |
Hi, I don't think this is a bug. It looks like the machines can not talk to each other. Can you validate that all the machines can talk to each other on the ports used by Flink (6123, 8081, ...) If that doesn't help: - How is the network set up? - Are you running physical machines / VMs / containers? - Is there a firewall involved? Best, Robert On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <[hidden email]> wrote:
|
The cluster is set up on AWS with 1 Job manager and 2 task managers. They all belong to same security group with 6123, 8081, 50100 - 50200 ports having access granted Job manager config is as follows : FLINK_PLUGINS_DIR : /usr/local/flink-1.9.1/plugins io.tmp.dirs : /tmp/flink jobmanager.execution.failover-strategy : region jobmanager.heap.size : 1024m jobmanager.rpc.address : 10.0.16.10 jobmanager.rpc.port : 6123 jobstore.cache-size : 52428800 jobstore.expiration-time : 3600 parallelism.default : 4 slot.idle.timeout : 50000 slot.request.timeout : 300000 task.cancellation.interval : 30000 task.cancellation.timeout : 180000 task.cancellation.timers.timeout : 7500 taskmanager.exit-on-fatal-akka-error : false taskmanager.heap.size : 1024m taskmanager.network.bind-policy : "ip" taskmanager.numberOfTaskSlots : 2 taskmanager.registration.initial-backoff: 500ms taskmanager.registration.timeout : 5min taskmanager.rpc.port : 50100-50200 web.tmpdir : /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0 I have summarised the more details in a stack overflow question where it is easier to put the various details. On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger <[hidden email]> wrote:
|
I figured out that it was problem with the ports. 39493/34094 were not accessible. So to get this working I opened all the ports 0-65535 for the security group. How do I control that if I want to open only certain range of ports ? Is "taskmanager.rpc.port" the right parameter to set ? I did try and set this to certain port range, but did not work. Thanks Milind On Wed, Feb 5, 2020 at 11:22 AM Milind Vaidya <[hidden email]> wrote:
|
Maybe you forget to limit the blob server port(blob.server.port) to the range. Best, Yang Milind Vaidya <[hidden email]> 于2020年2月7日周五 上午7:03写道:
|
I tried setting that option but did not work. 2020-02-07 19:28:45,999 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID 32fb9e7dcc9d41917bce38a2d5bb0093 (akka.tcp://flink@ip-1:34718/user/taskmanager_0) at ResourceManager 2020-02-07 19:28:46,425 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - Registering TaskManager with ResourceID 8c402a9c039d3c33466631510c48b552 (akka.tcp://flink@ip-2:37120/user/taskmanager_0) at ResourceManager I have setting as follows taskmanager.rpc.port : 50100-50200 blob.server.port : 50201-50300 So how to control the port for TaskManager ? Inspite above setting the task managers are being scheduled at ports 34718 and 37120. Thanks, Milind On Thu, Feb 6, 2020 at 5:25 PM Yang Wang <[hidden email]> wrote:
|
Hey Milind, can you additionally also set metrics.internal.query-service.portto the range? Best, Robert On Fri, Feb 7, 2020 at 8:35 PM Milind Vaidya <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |