Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

Milind Vaidya
Hi 

I am trying to build a cluster for flink with 1 master and 2 workers.
The program is working fine locally. The messages are read from Kafka and just printed on STDOUT.

The cluster is successfully created and UI is also shows all config. But the job fails to execute on the cluster.

Here are few exceptions I see in the log files 

File : flink-root-standalonesession

2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]] Caused by: [No response
 from remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]] Caused by: [No response f
rom remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:39493
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:34094
2020-01-29 19:58:21,880 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler  - Failed to transfer file from TaskExecutor a7abe6e294fa3ae4129fd695f7309a36.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


File : flink-root-client-ip


2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more
2020-01-29 19:48:10,663 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-01-29 19:48:10,856 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,874 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,875 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2020-01-29 19:48:10,881 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2020-01-29 19:48:10,965 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-01-29 19:48:11,160 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, ip
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: io.tmp.dirs, /tmp/flink
2020-01-29 19:48:11,311 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 4f4cce35db3f37cae310f272ec88a303 (detached: false).
2020-01-29 20:05:13,170 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2020-01-29 20:05:13,172 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f4cce35db3f37cae310f272ec88a303)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
        ... 18 more
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)


Flink version : flink-1.9.1
OS : CentOS Linux release 7.6.1810 (Core)

Is this related to this issue :

Can somebody throw some light on this ?



Reply | Threaded
Open this post in threaded view
|

Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

rmetzger0
Hi,

I don't think this is a bug. It looks like the machines can not talk to each other. Can you validate that all the machines can talk to each other on the ports used by Flink (6123, 8081, ...) 
If that doesn't help:
- How is the network set up?
- Are you running physical machines / VMs / containers? 
- Is there a firewall involved?

Best,
Robert


On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <[hidden email]> wrote:
Hi 

I am trying to build a cluster for flink with 1 master and 2 workers.
The program is working fine locally. The messages are read from Kafka and just printed on STDOUT.

The cluster is successfully created and UI is also shows all config. But the job fails to execute on the cluster.

Here are few exceptions I see in the log files 

File : flink-root-standalonesession

2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]] Caused by: [No response
 from remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]] Caused by: [No response f
rom remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:39493
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:34094
2020-01-29 19:58:21,880 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler  - Failed to transfer file from TaskExecutor a7abe6e294fa3ae4129fd695f7309a36.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


File : flink-root-client-ip


2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more
2020-01-29 19:48:10,663 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-01-29 19:48:10,856 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,874 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,875 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2020-01-29 19:48:10,881 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2020-01-29 19:48:10,965 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-01-29 19:48:11,160 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, ip
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: io.tmp.dirs, /tmp/flink
2020-01-29 19:48:11,311 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 4f4cce35db3f37cae310f272ec88a303 (detached: false).
2020-01-29 20:05:13,170 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2020-01-29 20:05:13,172 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f4cce35db3f37cae310f272ec88a303)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
        ... 18 more
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)


Flink version : flink-1.9.1
OS : CentOS Linux release 7.6.1810 (Core)

Is this related to this issue :

Can somebody throw some light on this ?



Reply | Threaded
Open this post in threaded view
|

Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

Milind Vaidya



The  cluster is set up on AWS with 1 Job manager and 2 task managers.
They all belong to same security group with 6123, 8081, 50100 - 50200 ports having access granted

Job manager config is as follows :

FLINK_PLUGINS_DIR                       :   /usr/local/flink-1.9.1/plugins
io.tmp.dirs                             :   /tmp/flink
jobmanager.execution.failover-strategy  :   region
jobmanager.heap.size                    :   1024m
jobmanager.rpc.address                  :   10.0.16.10
jobmanager.rpc.port                     :   6123
jobstore.cache-size                     :   52428800
jobstore.expiration-time                :   3600
parallelism.default                     :   4
slot.idle.timeout                       :   50000
slot.request.timeout                    :   300000
task.cancellation.interval              :   30000
task.cancellation.timeout               :   180000
task.cancellation.timers.timeout        :   7500
taskmanager.exit-on-fatal-akka-error    :   false
taskmanager.heap.size                   :   1024m
taskmanager.network.bind-policy         :   "ip"
taskmanager.numberOfTaskSlots           :   2
taskmanager.registration.initial-backoff:   500ms
taskmanager.registration.timeout        :   5min
taskmanager.rpc.port                    :   50100-50200
web.tmpdir                              :   /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0



I have summarised the more details in a stack overflow question where it is easier to put the various details.

On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger <[hidden email]> wrote:
Hi,

I don't think this is a bug. It looks like the machines can not talk to each other. Can you validate that all the machines can talk to each other on the ports used by Flink (6123, 8081, ...) 
If that doesn't help:
- How is the network set up?
- Are you running physical machines / VMs / containers? 
- Is there a firewall involved?

Best,
Robert


On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <[hidden email]> wrote:
Hi 

I am trying to build a cluster for flink with 1 master and 2 workers.
The program is working fine locally. The messages are read from Kafka and just printed on STDOUT.

The cluster is successfully created and UI is also shows all config. But the job fails to execute on the cluster.

Here are few exceptions I see in the log files 

File : flink-root-standalonesession

2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]] Caused by: [No response
 from remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]] Caused by: [No response f
rom remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:39493
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:34094
2020-01-29 19:58:21,880 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler  - Failed to transfer file from TaskExecutor a7abe6e294fa3ae4129fd695f7309a36.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


File : flink-root-client-ip


2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more
2020-01-29 19:48:10,663 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-01-29 19:48:10,856 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,874 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,875 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2020-01-29 19:48:10,881 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2020-01-29 19:48:10,965 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-01-29 19:48:11,160 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, ip
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: io.tmp.dirs, /tmp/flink
2020-01-29 19:48:11,311 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 4f4cce35db3f37cae310f272ec88a303 (detached: false).
2020-01-29 20:05:13,170 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2020-01-29 20:05:13,172 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f4cce35db3f37cae310f272ec88a303)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
        ... 18 more
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)


Flink version : flink-1.9.1
OS : CentOS Linux release 7.6.1810 (Core)

Is this related to this issue :

Can somebody throw some light on this ?



Reply | Threaded
Open this post in threaded view
|

Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

Milind Vaidya
I figured out that it was problem with the ports. 39493/34094 were not accessible. So to get this working I opened all the ports 0-65535 for the security group.

How do I control that if I want to open only certain range of ports ?

Is "taskmanager.rpc.port" the right parameter to set ? I did try and set this to certain port range, but did not work.

Thanks 
Milind 

On Wed, Feb 5, 2020 at 11:22 AM Milind Vaidya <[hidden email]> wrote:



The  cluster is set up on AWS with 1 Job manager and 2 task managers.
They all belong to same security group with 6123, 8081, 50100 - 50200 ports having access granted

Job manager config is as follows :

FLINK_PLUGINS_DIR                       :   /usr/local/flink-1.9.1/plugins
io.tmp.dirs                             :   /tmp/flink
jobmanager.execution.failover-strategy  :   region
jobmanager.heap.size                    :   1024m
jobmanager.rpc.address                  :   10.0.16.10
jobmanager.rpc.port                     :   6123
jobstore.cache-size                     :   52428800
jobstore.expiration-time                :   3600
parallelism.default                     :   4
slot.idle.timeout                       :   50000
slot.request.timeout                    :   300000
task.cancellation.interval              :   30000
task.cancellation.timeout               :   180000
task.cancellation.timers.timeout        :   7500
taskmanager.exit-on-fatal-akka-error    :   false
taskmanager.heap.size                   :   1024m
taskmanager.network.bind-policy         :   "ip"
taskmanager.numberOfTaskSlots           :   2
taskmanager.registration.initial-backoff:   500ms
taskmanager.registration.timeout        :   5min
taskmanager.rpc.port                    :   50100-50200
web.tmpdir                              :   /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0



I have summarised the more details in a stack overflow question where it is easier to put the various details.

On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger <[hidden email]> wrote:
Hi,

I don't think this is a bug. It looks like the machines can not talk to each other. Can you validate that all the machines can talk to each other on the ports used by Flink (6123, 8081, ...) 
If that doesn't help:
- How is the network set up?
- Are you running physical machines / VMs / containers? 
- Is there a firewall involved?

Best,
Robert


On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <[hidden email]> wrote:
Hi 

I am trying to build a cluster for flink with 1 master and 2 workers.
The program is working fine locally. The messages are read from Kafka and just printed on STDOUT.

The cluster is successfully created and UI is also shows all config. But the job fails to execute on the cluster.

Here are few exceptions I see in the log files 

File : flink-root-standalonesession

2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]] Caused by: [No response
 from remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]] Caused by: [No response f
rom remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:39493
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:34094
2020-01-29 19:58:21,880 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler  - Failed to transfer file from TaskExecutor a7abe6e294fa3ae4129fd695f7309a36.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


File : flink-root-client-ip


2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more
2020-01-29 19:48:10,663 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-01-29 19:48:10,856 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,874 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,875 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2020-01-29 19:48:10,881 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2020-01-29 19:48:10,965 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-01-29 19:48:11,160 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, ip
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: io.tmp.dirs, /tmp/flink
2020-01-29 19:48:11,311 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 4f4cce35db3f37cae310f272ec88a303 (detached: false).
2020-01-29 20:05:13,170 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2020-01-29 20:05:13,172 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f4cce35db3f37cae310f272ec88a303)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
        ... 18 more
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)


Flink version : flink-1.9.1
OS : CentOS Linux release 7.6.1810 (Core)

Is this related to this issue :

Can somebody throw some light on this ?



Reply | Threaded
Open this post in threaded view
|

Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

Yang Wang
Maybe you forget to limit the blob server port(blob.server.port) to the range.


Best,
Yang

Milind Vaidya <[hidden email]> 于2020年2月7日周五 上午7:03写道:
I figured out that it was problem with the ports. 39493/34094 were not accessible. So to get this working I opened all the ports 0-65535 for the security group.

How do I control that if I want to open only certain range of ports ?

Is "taskmanager.rpc.port" the right parameter to set ? I did try and set this to certain port range, but did not work.

Thanks 
Milind 

On Wed, Feb 5, 2020 at 11:22 AM Milind Vaidya <[hidden email]> wrote:



The  cluster is set up on AWS with 1 Job manager and 2 task managers.
They all belong to same security group with 6123, 8081, 50100 - 50200 ports having access granted

Job manager config is as follows :

FLINK_PLUGINS_DIR                       :   /usr/local/flink-1.9.1/plugins
io.tmp.dirs                             :   /tmp/flink
jobmanager.execution.failover-strategy  :   region
jobmanager.heap.size                    :   1024m
jobmanager.rpc.address                  :   10.0.16.10
jobmanager.rpc.port                     :   6123
jobstore.cache-size                     :   52428800
jobstore.expiration-time                :   3600
parallelism.default                     :   4
slot.idle.timeout                       :   50000
slot.request.timeout                    :   300000
task.cancellation.interval              :   30000
task.cancellation.timeout               :   180000
task.cancellation.timers.timeout        :   7500
taskmanager.exit-on-fatal-akka-error    :   false
taskmanager.heap.size                   :   1024m
taskmanager.network.bind-policy         :   "ip"
taskmanager.numberOfTaskSlots           :   2
taskmanager.registration.initial-backoff:   500ms
taskmanager.registration.timeout        :   5min
taskmanager.rpc.port                    :   50100-50200
web.tmpdir                              :   /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0



I have summarised the more details in a stack overflow question where it is easier to put the various details.

On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger <[hidden email]> wrote:
Hi,

I don't think this is a bug. It looks like the machines can not talk to each other. Can you validate that all the machines can talk to each other on the ports used by Flink (6123, 8081, ...) 
If that doesn't help:
- How is the network set up?
- Are you running physical machines / VMs / containers? 
- Is there a firewall involved?

Best,
Robert


On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <[hidden email]> wrote:
Hi 

I am trying to build a cluster for flink with 1 master and 2 workers.
The program is working fine locally. The messages are read from Kafka and just printed on STDOUT.

The cluster is successfully created and UI is also shows all config. But the job fails to execute on the cluster.

Here are few exceptions I see in the log files 

File : flink-root-standalonesession

2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]] Caused by: [No response
 from remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]] Caused by: [No response f
rom remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:39493
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:34094
2020-01-29 19:58:21,880 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler  - Failed to transfer file from TaskExecutor a7abe6e294fa3ae4129fd695f7309a36.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


File : flink-root-client-ip


2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more
2020-01-29 19:48:10,663 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-01-29 19:48:10,856 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,874 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,875 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2020-01-29 19:48:10,881 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2020-01-29 19:48:10,965 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-01-29 19:48:11,160 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, ip
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: io.tmp.dirs, /tmp/flink
2020-01-29 19:48:11,311 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 4f4cce35db3f37cae310f272ec88a303 (detached: false).
2020-01-29 20:05:13,170 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2020-01-29 20:05:13,172 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f4cce35db3f37cae310f272ec88a303)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
        ... 18 more
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)


Flink version : flink-1.9.1
OS : CentOS Linux release 7.6.1810 (Core)

Is this related to this issue :

Can somebody throw some light on this ?



Reply | Threaded
Open this post in threaded view
|

Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

Milind Vaidya
I tried setting that option but did not work. 

2020-02-07 19:28:45,999 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 32fb9e7dcc9d41917bce38a2d5bb0093 (akka.tcp://flink@ip-1:34718/user/taskmanager_0) at ResourceManager
2020-02-07 19:28:46,425 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 8c402a9c039d3c33466631510c48b552 (akka.tcp://flink@ip-2:37120/user/taskmanager_0) at ResourceManager

I have setting as follows 

taskmanager.rpc.port : 50100-50200
blob.server.port : 50201-50300

So how to control the port for TaskManager ? Inspite above setting the task managers are being scheduled at ports 34718 and 37120.

Thanks,
Milind 



On Thu, Feb 6, 2020 at 5:25 PM Yang Wang <[hidden email]> wrote:
Maybe you forget to limit the blob server port(blob.server.port) to the range.


Best,
Yang

Milind Vaidya <[hidden email]> 于2020年2月7日周五 上午7:03写道:
I figured out that it was problem with the ports. 39493/34094 were not accessible. So to get this working I opened all the ports 0-65535 for the security group.

How do I control that if I want to open only certain range of ports ?

Is "taskmanager.rpc.port" the right parameter to set ? I did try and set this to certain port range, but did not work.

Thanks 
Milind 

On Wed, Feb 5, 2020 at 11:22 AM Milind Vaidya <[hidden email]> wrote:



The  cluster is set up on AWS with 1 Job manager and 2 task managers.
They all belong to same security group with 6123, 8081, 50100 - 50200 ports having access granted

Job manager config is as follows :

FLINK_PLUGINS_DIR                       :   /usr/local/flink-1.9.1/plugins
io.tmp.dirs                             :   /tmp/flink
jobmanager.execution.failover-strategy  :   region
jobmanager.heap.size                    :   1024m
jobmanager.rpc.address                  :   10.0.16.10
jobmanager.rpc.port                     :   6123
jobstore.cache-size                     :   52428800
jobstore.expiration-time                :   3600
parallelism.default                     :   4
slot.idle.timeout                       :   50000
slot.request.timeout                    :   300000
task.cancellation.interval              :   30000
task.cancellation.timeout               :   180000
task.cancellation.timers.timeout        :   7500
taskmanager.exit-on-fatal-akka-error    :   false
taskmanager.heap.size                   :   1024m
taskmanager.network.bind-policy         :   "ip"
taskmanager.numberOfTaskSlots           :   2
taskmanager.registration.initial-backoff:   500ms
taskmanager.registration.timeout        :   5min
taskmanager.rpc.port                    :   50100-50200
web.tmpdir                              :   /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0



I have summarised the more details in a stack overflow question where it is easier to put the various details.

On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger <[hidden email]> wrote:
Hi,

I don't think this is a bug. It looks like the machines can not talk to each other. Can you validate that all the machines can talk to each other on the ports used by Flink (6123, 8081, ...) 
If that doesn't help:
- How is the network set up?
- Are you running physical machines / VMs / containers? 
- Is there a firewall involved?

Best,
Robert


On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <[hidden email]> wrote:
Hi 

I am trying to build a cluster for flink with 1 master and 2 workers.
The program is working fine locally. The messages are read from Kafka and just printed on STDOUT.

The cluster is successfully created and UI is also shows all config. But the job fails to execute on the cluster.

Here are few exceptions I see in the log files 

File : flink-root-standalonesession

2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]] Caused by: [No response
 from remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]] Caused by: [No response f
rom remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:39493
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:34094
2020-01-29 19:58:21,880 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler  - Failed to transfer file from TaskExecutor a7abe6e294fa3ae4129fd695f7309a36.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


File : flink-root-client-ip


2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more
2020-01-29 19:48:10,663 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-01-29 19:48:10,856 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,874 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,875 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2020-01-29 19:48:10,881 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2020-01-29 19:48:10,965 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-01-29 19:48:11,160 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, ip
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: io.tmp.dirs, /tmp/flink
2020-01-29 19:48:11,311 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 4f4cce35db3f37cae310f272ec88a303 (detached: false).
2020-01-29 20:05:13,170 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2020-01-29 20:05:13,172 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f4cce35db3f37cae310f272ec88a303)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
        ... 18 more
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)


Flink version : flink-1.9.1
OS : CentOS Linux release 7.6.1810 (Core)

Is this related to this issue :

Can somebody throw some light on this ?



Reply | Threaded
Open this post in threaded view
|

Re: Failed to transfer file from TaskExecutor : Vanilla Flink Cluster

rmetzger0
Hey Milind,

can you additionally also set 
metrics.internal.query-service.port
to the range?


Best,
Robert


On Fri, Feb 7, 2020 at 8:35 PM Milind Vaidya <[hidden email]> wrote:
I tried setting that option but did not work. 

2020-02-07 19:28:45,999 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 32fb9e7dcc9d41917bce38a2d5bb0093 (akka.tcp://flink@ip-1:34718/user/taskmanager_0) at ResourceManager
2020-02-07 19:28:46,425 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Registering TaskManager with ResourceID 8c402a9c039d3c33466631510c48b552 (akka.tcp://flink@ip-2:37120/user/taskmanager_0) at ResourceManager

I have setting as follows 

taskmanager.rpc.port : 50100-50200
blob.server.port : 50201-50300

So how to control the port for TaskManager ? Inspite above setting the task managers are being scheduled at ports 34718 and 37120.

Thanks,
Milind 



On Thu, Feb 6, 2020 at 5:25 PM Yang Wang <[hidden email]> wrote:
Maybe you forget to limit the blob server port(blob.server.port) to the range.


Best,
Yang

Milind Vaidya <[hidden email]> 于2020年2月7日周五 上午7:03写道:
I figured out that it was problem with the ports. 39493/34094 were not accessible. So to get this working I opened all the ports 0-65535 for the security group.

How do I control that if I want to open only certain range of ports ?

Is "taskmanager.rpc.port" the right parameter to set ? I did try and set this to certain port range, but did not work.

Thanks 
Milind 

On Wed, Feb 5, 2020 at 11:22 AM Milind Vaidya <[hidden email]> wrote:



The  cluster is set up on AWS with 1 Job manager and 2 task managers.
They all belong to same security group with 6123, 8081, 50100 - 50200 ports having access granted

Job manager config is as follows :

FLINK_PLUGINS_DIR                       :   /usr/local/flink-1.9.1/plugins
io.tmp.dirs                             :   /tmp/flink
jobmanager.execution.failover-strategy  :   region
jobmanager.heap.size                    :   1024m
jobmanager.rpc.address                  :   10.0.16.10
jobmanager.rpc.port                     :   6123
jobstore.cache-size                     :   52428800
jobstore.expiration-time                :   3600
parallelism.default                     :   4
slot.idle.timeout                       :   50000
slot.request.timeout                    :   300000
task.cancellation.interval              :   30000
task.cancellation.timeout               :   180000
task.cancellation.timers.timeout        :   7500
taskmanager.exit-on-fatal-akka-error    :   false
taskmanager.heap.size                   :   1024m
taskmanager.network.bind-policy         :   "ip"
taskmanager.numberOfTaskSlots           :   2
taskmanager.registration.initial-backoff:   500ms
taskmanager.registration.timeout        :   5min
taskmanager.rpc.port                    :   50100-50200
web.tmpdir                              :   /tmp/flink-web-74cce811-17c0-411e-9d11-6d91edd2e9b0



I have summarised the more details in a stack overflow question where it is easier to put the various details.

On Wed, Feb 5, 2020 at 2:25 AM Robert Metzger <[hidden email]> wrote:
Hi,

I don't think this is a bug. It looks like the machines can not talk to each other. Can you validate that all the machines can talk to each other on the ports used by Flink (6123, 8081, ...) 
If that doesn't help:
- How is the network set up?
- Are you running physical machines / VMs / containers? 
- Is there a firewall involved?

Best,
Robert


On Fri, Jan 31, 2020 at 7:25 PM Milind Vaidya <[hidden email]> wrote:
Hi 

I am trying to build a cluster for flink with 1 master and 2 workers.
The program is working fine locally. The messages are read from Kafka and just printed on STDOUT.

The cluster is successfully created and UI is also shows all config. But the job fails to execute on the cluster.

Here are few exceptions I see in the log files 

File : flink-root-standalonesession

2020-01-29 19:55:00,348 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 INFO  akka.remote.transport.ProtocolStateActor                      - No response from remote for outbound association. Associate timed out after [20000 ms].
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:39493] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:39493]] Caused by: [No response
 from remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,350 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://flink-metrics@ip:34094] has failed, address is now gated for [50] ms. Reason: [Association failed with [akka.tcp://flink-metrics@ip:34094]] Caused by: [No response f
rom remote for outbound association. Associate timed out after [20000 ms].]
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:39493
2020-01-29 19:55:00,359 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /ip:34094
2020-01-29 19:58:21,880 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler  - Failed to transfer file from TaskExecutor a7abe6e294fa3ae4129fd695f7309a36.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#5385019]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.LocalFencedMessage]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.


File : flink-root-client-ip


2020-01-29 19:48:10,566 WARN  org.apache.flink.client.cli.CliFrontend                       - Could not load CLI class org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1187)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1147)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1072)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more
2020-01-29 19:48:10,663 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2020-01-29 19:48:10,856 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,874 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2020-01-29 19:48:10,875 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2020-01-29 19:48:10,881 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2020-01-29 19:48:10,965 INFO  org.apache.flink.configuration.Configuration                  - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
2020-01-29 19:48:11,160 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2020-01-29 19:48:11,163 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, ip
2020-01-29 19:48:11,306 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 2
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 4
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.execution.failover-strategy, region
2020-01-29 19:48:11,307 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: io.tmp.dirs, /tmp/flink
2020-01-29 19:48:11,311 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job 4f4cce35db3f37cae310f272ec88a303 (detached: false).
2020-01-29 20:05:13,170 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2020-01-29 20:05:13,172 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2020-01-29 20:05:13,172 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 4f4cce35db3f37cae310f272ec88a303)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
        at com.saavn.flink.SongCountStreamingJob.main(SongCountStreamingJob.java:79)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
        ... 18 more
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id a7abe6e294fa3ae4129fd695f7309a36 timed out.
        at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1149)
        at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)


Flink version : flink-1.9.1
OS : CentOS Linux release 7.6.1810 (Core)

Is this related to this issue :

Can somebody throw some light on this ?