YARN JobManager HA using wrong network interface

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

YARN JobManager HA using wrong network interface

Maximilian Bode
Hi everyone,

we are trying to get to work JobManager HA in the context of a per-job YARN session using the 1.0.0-rc3 from a few days ago and are having a problem concerning task managers with several network interfaces.

After manually killing the job manager process, the jobmanager.log on the newly allocated second job manager reads:
---
2016-03-02 18:01:09,635 WARN  Remoting                                                      - Tried to associate with unreachable remote address [<a href="akka.tcp://flink@10.127.68.136:34811" class="">akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: /10.127.68.136:34811
2016-03-02 18:01:09,644 WARN  org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(<a href="akka.tcp://flink@10.127.68.136:34811/" class="">akka.tcp://flink@10.127.68.136:34811/), Path(/user/jobmanager)]
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
---
where the IP not found is from the old job manager. So far, is this the expected behavior? 

The problem then arises on a new task manager, which also tries to connect to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService starts cycling through the available network interfaces, as can be seen in the relevant taskmanager.log:
---
2016-03-02 18:01:13,636 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService.
2016-03-02 18:01:13,646 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to select the network interface and address to use by connecting to the leading JobManager.
2016-03-02 18:01:13,646 INFO  org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager will try to connect for 10000 milliseconds before falling back to heuristics
2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Retrieved new target address /10.127.68.136:34811.
2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Trying to connect to address /10.127.68.136:34811
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Failed to connect from address 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Failed to connect from address '/10.127.68.136': Connection refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Failed to connect from address '/10.120.193.110': Connection refused
2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Failed to connect from address '/10.127.68.136': Connection refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Failed to connect from address '/127.0.0.1': Connection refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Failed to connect from address '/10.120.193.110': Connection refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Failed to connect from address '/10.127.68.136': Connection refused
2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils                  - Failed to connect from address '/127.0.0.1': Connection refused
---
After five repetitions, the task manager stops trying to retrieve the leader and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from now on:
---
2016-03-02 18:01:23,650 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Stopping ZooKeeperLeaderRetrievalService.
2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn                               - EventThread shut down
2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper                                - Session: 0x25229757cff035b closed
2016-03-02 18:01:23,664 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110) for communication.
---
Following the new jobmanager is discovered and the taskmanager is able to register at the jobmanager using eth1. The problem is that connections TO eth1 are not possible. So flink should always use eth0. The exception we later see is:
---
java.io.IOException: Connecting the channel failed: Connecting to remote task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has failed. This might indicate that the remote task manager has been lost.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
at org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:744)
---
The root cause seems to be that network interface selection is still using the old jobmanager location and hence is not able to choose the right interface. In particular, it seems that iteration order over the network interfaces differs between the HEURISTIC and SLOW strategy, which then leads to the wrong interface being selected. 

Cheers,
 Max
— 
Maximilian Bode * Junior Consultant * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (465 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Ufuk Celebi
Hey Max!

for the first WARN in
org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
expected if the new leader has not updated ZooKeeper yet. The
important thing is that the new leading job manager is eventually
retrieved. This did happen, right?

Regarding eth1 vs. eth0: After the new job manager becomes leader, the
task manager should re-try connecting to it with the same strategy as
in the initial connection establishment (e.g. try SLOW first and only
fall back to HEURISTIC). Can you see in the logs whether this happens?

The best thing would be to share the complete logs. Is this possible?
If not publicly, feel free to send it to me privately (uce at apache
org).

– Ufuk


On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
<[hidden email]> wrote:

> Hi everyone,
>
> we are trying to get to work JobManager HA in the context of a per-job YARN
> session using the 1.0.0-rc3 from a few days ago and are having a problem
> concerning task managers with several network interfaces.
>
> After manually killing the job manager process, the jobmanager.log on the
> newly allocated second job manager reads:
> ---
> 2016-03-02 18:01:09,635 WARN  Remoting
> - Tried to associate with unreachable remote address
> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
> all messages to this address will be delivered to dead letters. Reason:
> Connection refused: /10.127.68.136:34811
> 2016-03-02 18:01:09,644 WARN
> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for:
> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
> Path(/user/jobmanager)]
> at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
> at
> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> at
> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
> at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
> at
> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
> at
> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
> at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ---
> where the IP not found is from the old job manager. So far, is this the
> expected behavior?
>
> The problem then arises on a new task manager, which also tries to connect
> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
> starts cycling through the available network interfaces, as can be seen in
> the relevant taskmanager.log:
> ---
> 2016-03-02 18:01:13,636 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Starting ZooKeeperLeaderRetrievalService.
> 2016-03-02 18:01:13,646 INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
> select the network interface and address to use by connecting to the leading
> JobManager.
> 2016-03-02 18:01:13,646 INFO
> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
> will try to connect for 10000 milliseconds before falling back to heuristics
> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Retrieved new target address /10.127.68.136:34811.
> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Trying to connect to address /10.127.68.136:34811
> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address
> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/10.127.68.136': Connection refused
> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/10.120.193.110': Connection refused
> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/10.127.68.136': Connection refused
> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection refused
> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/10.120.193.110': Connection refused
> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/10.127.68.136': Connection refused
> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
> - Failed to connect from address '/127.0.0.1': Connection refused
> ---
> After five repetitions, the task manager stops trying to retrieve the leader
> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
> now on:
> ---
> 2016-03-02 18:01:23,650 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Stopping ZooKeeperLeaderRetrievalService.
> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
> - EventThread shut down
> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
> - Session: 0x25229757cff035b closed
> 2016-03-02 18:01:23,664 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
> for communication.
> ---
> Following the new jobmanager is discovered and the taskmanager is able to
> register at the jobmanager using eth1. The problem is that connections TO
> eth1 are not possible. So flink should always use eth0. The exception we
> later see is:
> ---
> java.io.IOException: Connecting the channel failed: Connecting to remote
> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
> failed. This might indicate that the remote task manager has been lost.
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
> at
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
> at
> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
> at
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
> at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:744)
> ---
> The root cause seems to be that network interface selection is still using
> the old jobmanager location and hence is not able to choose the right
> interface. In particular, it seems that iteration order over the network
> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
> to the wrong interface being selected.
>
> Cheers,
>  Max
> —
> Maximilian Bode * Junior Consultant * [hidden email]
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Ufuk Celebi
I had an offline chat with Till about this. He pointed out that the
address is chosen once at start up time (while not being able to
connect to the old job manager) and then it stays fixed at eth1.

You can increase the lookup timeout by setting akka.lookup.timeout to
a higher value (like 100 s). This is the only workaroud I'm aware of
at this point. Maybe Till can chime in here whether this has other
implications as well?

– Ufuk

On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <[hidden email]> wrote:

> Hey Max!
>
> for the first WARN in
> org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> expected if the new leader has not updated ZooKeeper yet. The
> important thing is that the new leading job manager is eventually
> retrieved. This did happen, right?
>
> Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> task manager should re-try connecting to it with the same strategy as
> in the initial connection establishment (e.g. try SLOW first and only
> fall back to HEURISTIC). Can you see in the logs whether this happens?
>
> The best thing would be to share the complete logs. Is this possible?
> If not publicly, feel free to send it to me privately (uce at apache
> org).
>
> – Ufuk
>
>
> On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
> <[hidden email]> wrote:
>> Hi everyone,
>>
>> we are trying to get to work JobManager HA in the context of a per-job YARN
>> session using the 1.0.0-rc3 from a few days ago and are having a problem
>> concerning task managers with several network interfaces.
>>
>> After manually killing the job manager process, the jobmanager.log on the
>> newly allocated second job manager reads:
>> ---
>> 2016-03-02 18:01:09,635 WARN  Remoting
>> - Tried to associate with unreachable remote address
>> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
>> all messages to this address will be delivered to dead letters. Reason:
>> Connection refused: /10.127.68.136:34811
>> 2016-03-02 18:01:09,644 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>> Path(/user/jobmanager)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>> at
>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>> at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ---
>> where the IP not found is from the old job manager. So far, is this the
>> expected behavior?
>>
>> The problem then arises on a new task manager, which also tries to connect
>> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
>> starts cycling through the available network interfaces, as can be seen in
>> the relevant taskmanager.log:
>> ---
>> 2016-03-02 18:01:13,636 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
>> select the network interface and address to use by connecting to the leading
>> JobManager.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
>> will try to connect for 10000 milliseconds before falling back to heuristics
>> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Retrieved new target address /10.127.68.136:34811.
>> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Trying to connect to address /10.127.68.136:34811
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address
>> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> ---
>> After five repetitions, the task manager stops trying to retrieve the leader
>> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
>> now on:
>> ---
>> 2016-03-02 18:01:23,650 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>> - EventThread shut down
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>> - Session: 0x25229757cff035b closed
>> 2016-03-02 18:01:23,664 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
>> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
>> for communication.
>> ---
>> Following the new jobmanager is discovered and the taskmanager is able to
>> register at the jobmanager using eth1. The problem is that connections TO
>> eth1 are not possible. So flink should always use eth0. The exception we
>> later see is:
>> ---
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:744)
>> ---
>> The root cause seems to be that network interface selection is still using
>> the old jobmanager location and hence is not able to choose the right
>> interface. In particular, it seems that iteration order over the network
>> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
>> to the wrong interface being selected.
>>
>> Cheers,
>>  Max
>> —
>> Maximilian Bode * Junior Consultant * [hidden email]
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Till Rohrmann

Hi Max,

the problem is that before starting the TM, we have to find the network interface which is reachable by the other machines. So what we do is to connect to the current JobManager. If it should happen, as in your case, that the JobManager just died and the new JM address has not been written to ZooKeeper, then the TMs don’t have much choice other than using the heuristic.

I can’t really tell why eth1 is chosen over eth0. The condition is that the interface address is an Inet4Address, no link local address as well as not a loopback address.

Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the easiest solution to solve your problem. I’ve checked the default value is set to 10 s which might be a bit too low for restarting a new JM and publishing its address via ZooKeeper.

Cheers,
Till


On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <[hidden email]> wrote:
I had an offline chat with Till about this. He pointed out that the
address is chosen once at start up time (while not being able to
connect to the old job manager) and then it stays fixed at eth1.

You can increase the lookup timeout by setting akka.lookup.timeout to
a higher value (like 100 s). This is the only workaroud I'm aware of
at this point. Maybe Till can chime in here whether this has other
implications as well?

– Ufuk

On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <[hidden email]> wrote:
> Hey Max!
>
> for the first WARN in
> org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> expected if the new leader has not updated ZooKeeper yet. The
> important thing is that the new leading job manager is eventually
> retrieved. This did happen, right?
>
> Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> task manager should re-try connecting to it with the same strategy as
> in the initial connection establishment (e.g. try SLOW first and only
> fall back to HEURISTIC). Can you see in the logs whether this happens?
>
> The best thing would be to share the complete logs. Is this possible?
> If not publicly, feel free to send it to me privately (uce at apache
> org).
>
> – Ufuk
>
>
> On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
> <[hidden email]> wrote:
>> Hi everyone,
>>
>> we are trying to get to work JobManager HA in the context of a per-job YARN
>> session using the 1.0.0-rc3 from a few days ago and are having a problem
>> concerning task managers with several network interfaces.
>>
>> After manually killing the job manager process, the jobmanager.log on the
>> newly allocated second job manager reads:
>> ---
>> 2016-03-02 18:01:09,635 WARN  Remoting
>> - Tried to associate with unreachable remote address
>> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
>> all messages to this address will be delivered to dead letters. Reason:
>> Connection refused: /10.127.68.136:34811
>> 2016-03-02 18:01:09,644 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>> Path(/user/jobmanager)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>> at
>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>> at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ---
>> where the IP not found is from the old job manager. So far, is this the
>> expected behavior?
>>
>> The problem then arises on a new task manager, which also tries to connect
>> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
>> starts cycling through the available network interfaces, as can be seen in
>> the relevant taskmanager.log:
>> ---
>> 2016-03-02 18:01:13,636 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
>> select the network interface and address to use by connecting to the leading
>> JobManager.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
>> will try to connect for 10000 milliseconds before falling back to heuristics
>> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Retrieved new target address /10.127.68.136:34811.
>> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Trying to connect to address /10.127.68.136:34811
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address
>> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> ---
>> After five repetitions, the task manager stops trying to retrieve the leader
>> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
>> now on:
>> ---
>> 2016-03-02 18:01:23,650 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>> - EventThread shut down
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>> - Session: 0x25229757cff035b closed
>> 2016-03-02 18:01:23,664 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
>> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
>> for communication.
>> ---
>> Following the new jobmanager is discovered and the taskmanager is able to
>> register at the jobmanager using eth1. The problem is that connections TO
>> eth1 are not possible. So flink should always use eth0. The exception we
>> later see is:
>> ---
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:744)
>> ---
>> The root cause seems to be that network interface selection is still using
>> the old jobmanager location and hence is not able to choose the right
>> interface. In particular, it seems that iteration order over the network
>> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
>> to the wrong interface being selected.
>>
>> Cheers,
>>  Max
>> —
>> Maximilian Bode * Junior Consultant * [hidden email]
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>

Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Stephan Ewen
If the ThasManager cannot connect to the JobManager, it will use the interface that is bound to the machine's host name ("InetAddress.getLocalHost()").

So, the best way to fix this would be to make sure that all machines have a proper network configuration. Then Flink would either use an address that can connect (via trying various interfaces), or it would default back to the hostname/interface that is configured on the machine.


On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <[hidden email]> wrote:

Hi Max,

the problem is that before starting the TM, we have to find the network interface which is reachable by the other machines. So what we do is to connect to the current JobManager. If it should happen, as in your case, that the JobManager just died and the new JM address has not been written to ZooKeeper, then the TMs don’t have much choice other than using the heuristic.

I can’t really tell why eth1 is chosen over eth0. The condition is that the interface address is an Inet4Address, no link local address as well as not a loopback address.

Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the easiest solution to solve your problem. I’ve checked the default value is set to 10 s which might be a bit too low for restarting a new JM and publishing its address via ZooKeeper.

Cheers,
Till


On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <[hidden email]> wrote:
I had an offline chat with Till about this. He pointed out that the
address is chosen once at start up time (while not being able to
connect to the old job manager) and then it stays fixed at eth1.

You can increase the lookup timeout by setting akka.lookup.timeout to
a higher value (like 100 s). This is the only workaroud I'm aware of
at this point. Maybe Till can chime in here whether this has other
implications as well?

– Ufuk

On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <[hidden email]> wrote:
> Hey Max!
>
> for the first WARN in
> org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> expected if the new leader has not updated ZooKeeper yet. The
> important thing is that the new leading job manager is eventually
> retrieved. This did happen, right?
>
> Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> task manager should re-try connecting to it with the same strategy as
> in the initial connection establishment (e.g. try SLOW first and only
> fall back to HEURISTIC). Can you see in the logs whether this happens?
>
> The best thing would be to share the complete logs. Is this possible?
> If not publicly, feel free to send it to me privately (uce at apache
> org).
>
> – Ufuk
>
>
> On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
> <[hidden email]> wrote:
>> Hi everyone,
>>
>> we are trying to get to work JobManager HA in the context of a per-job YARN
>> session using the 1.0.0-rc3 from a few days ago and are having a problem
>> concerning task managers with several network interfaces.
>>
>> After manually killing the job manager process, the jobmanager.log on the
>> newly allocated second job manager reads:
>> ---
>> 2016-03-02 18:01:09,635 WARN  Remoting
>> - Tried to associate with unreachable remote address
>> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
>> all messages to this address will be delivered to dead letters. Reason:
>> Connection refused: /10.127.68.136:34811
>> 2016-03-02 18:01:09,644 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>> Path(/user/jobmanager)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>> at
>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>> at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ---
>> where the IP not found is from the old job manager. So far, is this the
>> expected behavior?
>>
>> The problem then arises on a new task manager, which also tries to connect
>> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
>> starts cycling through the available network interfaces, as can be seen in
>> the relevant taskmanager.log:
>> ---
>> 2016-03-02 18:01:13,636 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
>> select the network interface and address to use by connecting to the leading
>> JobManager.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
>> will try to connect for 10000 milliseconds before falling back to heuristics
>> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Retrieved new target address /10.127.68.136:34811.
>> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Trying to connect to address /10.127.68.136:34811
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address
>> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> ---
>> After five repetitions, the task manager stops trying to retrieve the leader
>> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
>> now on:
>> ---
>> 2016-03-02 18:01:23,650 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>> - EventThread shut down
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>> - Session: 0x25229757cff035b closed
>> 2016-03-02 18:01:23,664 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
>> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
>> for communication.
>> ---
>> Following the new jobmanager is discovered and the taskmanager is able to
>> register at the jobmanager using eth1. The problem is that connections TO
>> eth1 are not possible. So flink should always use eth0. The exception we
>> later see is:
>> ---
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:744)
>> ---
>> The root cause seems to be that network interface selection is still using
>> the old jobmanager location and hence is not able to choose the right
>> interface. In particular, it seems that iteration order over the network
>> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
>> to the wrong interface being selected.
>>
>> Cheers,
>>  Max
>> —
>> Maximilian Bode * Junior Consultant * [hidden email]
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>


Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Till Rohrmann

If I’m not mistaken, then it’s not necessarily true that the heuristic returns InetAddress.getLocalHost() in all cases. The heuristic will select the first network interface with the afore-mentioned conditions but before returning it, it will try a last time to connect to the JM via the interface bound to InetAddress.getLocalHost(). However, if this fails, then the heuristically selected network interface will be returned.


On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
If the ThasManager cannot connect to the JobManager, it will use the interface that is bound to the machine's host name ("InetAddress.getLocalHost()").

So, the best way to fix this would be to make sure that all machines have a proper network configuration. Then Flink would either use an address that can connect (via trying various interfaces), or it would default back to the hostname/interface that is configured on the machine.


On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <[hidden email]> wrote:

Hi Max,

the problem is that before starting the TM, we have to find the network interface which is reachable by the other machines. So what we do is to connect to the current JobManager. If it should happen, as in your case, that the JobManager just died and the new JM address has not been written to ZooKeeper, then the TMs don’t have much choice other than using the heuristic.

I can’t really tell why eth1 is chosen over eth0. The condition is that the interface address is an Inet4Address, no link local address as well as not a loopback address.

Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the easiest solution to solve your problem. I’ve checked the default value is set to 10 s which might be a bit too low for restarting a new JM and publishing its address via ZooKeeper.

Cheers,
Till


On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <[hidden email]> wrote:
I had an offline chat with Till about this. He pointed out that the
address is chosen once at start up time (while not being able to
connect to the old job manager) and then it stays fixed at eth1.

You can increase the lookup timeout by setting akka.lookup.timeout to
a higher value (like 100 s). This is the only workaroud I'm aware of
at this point. Maybe Till can chime in here whether this has other
implications as well?

– Ufuk

On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <[hidden email]> wrote:
> Hey Max!
>
> for the first WARN in
> org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> expected if the new leader has not updated ZooKeeper yet. The
> important thing is that the new leading job manager is eventually
> retrieved. This did happen, right?
>
> Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> task manager should re-try connecting to it with the same strategy as
> in the initial connection establishment (e.g. try SLOW first and only
> fall back to HEURISTIC). Can you see in the logs whether this happens?
>
> The best thing would be to share the complete logs. Is this possible?
> If not publicly, feel free to send it to me privately (uce at apache
> org).
>
> – Ufuk
>
>
> On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
> <[hidden email]> wrote:
>> Hi everyone,
>>
>> we are trying to get to work JobManager HA in the context of a per-job YARN
>> session using the 1.0.0-rc3 from a few days ago and are having a problem
>> concerning task managers with several network interfaces.
>>
>> After manually killing the job manager process, the jobmanager.log on the
>> newly allocated second job manager reads:
>> ---
>> 2016-03-02 18:01:09,635 WARN  Remoting
>> - Tried to associate with unreachable remote address
>> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
>> all messages to this address will be delivered to dead letters. Reason:
>> Connection refused: /10.127.68.136:34811
>> 2016-03-02 18:01:09,644 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>> Path(/user/jobmanager)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>> at
>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>> at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ---
>> where the IP not found is from the old job manager. So far, is this the
>> expected behavior?
>>
>> The problem then arises on a new task manager, which also tries to connect
>> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
>> starts cycling through the available network interfaces, as can be seen in
>> the relevant taskmanager.log:
>> ---
>> 2016-03-02 18:01:13,636 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
>> select the network interface and address to use by connecting to the leading
>> JobManager.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
>> will try to connect for 10000 milliseconds before falling back to heuristics
>> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Retrieved new target address /10.127.68.136:34811.
>> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Trying to connect to address /10.127.68.136:34811
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address
>> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> ---
>> After five repetitions, the task manager stops trying to retrieve the leader
>> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
>> now on:
>> ---
>> 2016-03-02 18:01:23,650 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>> - EventThread shut down
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>> - Session: 0x25229757cff035b closed
>> 2016-03-02 18:01:23,664 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
>> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
>> for communication.
>> ---
>> Following the new jobmanager is discovered and the taskmanager is able to
>> register at the jobmanager using eth1. The problem is that connections TO
>> eth1 are not possible. So flink should always use eth0. The exception we
>> later see is:
>> ---
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:744)
>> ---
>> The root cause seems to be that network interface selection is still using
>> the old jobmanager location and hence is not able to choose the right
>> interface. In particular, it seems that iteration order over the network
>> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
>> to the wrong interface being selected.
>>
>> Cheers,
>>  Max
>> —
>> Maximilian Bode * Junior Consultant * [hidden email]
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>



Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Stephan Ewen
Okay, that is a change from the original behavior, introduced in HA. Originally, if the connection attempts failed, it always returned the InetAddress.getLocalHost() interface.
I think we should change it back to that, because that interface is by far the best possible heuristic.

On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <[hidden email]> wrote:

If I’m not mistaken, then it’s not necessarily true that the heuristic returns InetAddress.getLocalHost() in all cases. The heuristic will select the first network interface with the afore-mentioned conditions but before returning it, it will try a last time to connect to the JM via the interface bound to InetAddress.getLocalHost(). However, if this fails, then the heuristically selected network interface will be returned.


On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
If the ThasManager cannot connect to the JobManager, it will use the interface that is bound to the machine's host name ("InetAddress.getLocalHost()").

So, the best way to fix this would be to make sure that all machines have a proper network configuration. Then Flink would either use an address that can connect (via trying various interfaces), or it would default back to the hostname/interface that is configured on the machine.


On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <[hidden email]> wrote:

Hi Max,

the problem is that before starting the TM, we have to find the network interface which is reachable by the other machines. So what we do is to connect to the current JobManager. If it should happen, as in your case, that the JobManager just died and the new JM address has not been written to ZooKeeper, then the TMs don’t have much choice other than using the heuristic.

I can’t really tell why eth1 is chosen over eth0. The condition is that the interface address is an Inet4Address, no link local address as well as not a loopback address.

Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the easiest solution to solve your problem. I’ve checked the default value is set to 10 s which might be a bit too low for restarting a new JM and publishing its address via ZooKeeper.

Cheers,
Till


On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <[hidden email]> wrote:
I had an offline chat with Till about this. He pointed out that the
address is chosen once at start up time (while not being able to
connect to the old job manager) and then it stays fixed at eth1.

You can increase the lookup timeout by setting akka.lookup.timeout to
a higher value (like 100 s). This is the only workaroud I'm aware of
at this point. Maybe Till can chime in here whether this has other
implications as well?

– Ufuk

On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <[hidden email]> wrote:
> Hey Max!
>
> for the first WARN in
> org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> expected if the new leader has not updated ZooKeeper yet. The
> important thing is that the new leading job manager is eventually
> retrieved. This did happen, right?
>
> Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> task manager should re-try connecting to it with the same strategy as
> in the initial connection establishment (e.g. try SLOW first and only
> fall back to HEURISTIC). Can you see in the logs whether this happens?
>
> The best thing would be to share the complete logs. Is this possible?
> If not publicly, feel free to send it to me privately (uce at apache
> org).
>
> – Ufuk
>
>
> On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
> <[hidden email]> wrote:
>> Hi everyone,
>>
>> we are trying to get to work JobManager HA in the context of a per-job YARN
>> session using the 1.0.0-rc3 from a few days ago and are having a problem
>> concerning task managers with several network interfaces.
>>
>> After manually killing the job manager process, the jobmanager.log on the
>> newly allocated second job manager reads:
>> ---
>> 2016-03-02 18:01:09,635 WARN  Remoting
>> - Tried to associate with unreachable remote address
>> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
>> all messages to this address will be delivered to dead letters. Reason:
>> Connection refused: /10.127.68.136:34811
>> 2016-03-02 18:01:09,644 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>> Path(/user/jobmanager)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>> at
>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>> at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ---
>> where the IP not found is from the old job manager. So far, is this the
>> expected behavior?
>>
>> The problem then arises on a new task manager, which also tries to connect
>> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
>> starts cycling through the available network interfaces, as can be seen in
>> the relevant taskmanager.log:
>> ---
>> 2016-03-02 18:01:13,636 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
>> select the network interface and address to use by connecting to the leading
>> JobManager.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
>> will try to connect for 10000 milliseconds before falling back to heuristics
>> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Retrieved new target address /10.127.68.136:34811.
>> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Trying to connect to address /10.127.68.136:34811
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address
>> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> ---
>> After five repetitions, the task manager stops trying to retrieve the leader
>> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
>> now on:
>> ---
>> 2016-03-02 18:01:23,650 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>> - EventThread shut down
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>> - Session: 0x25229757cff035b closed
>> 2016-03-02 18:01:23,664 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
>> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
>> for communication.
>> ---
>> Following the new jobmanager is discovered and the taskmanager is able to
>> register at the jobmanager using eth1. The problem is that connections TO
>> eth1 are not possible. So flink should always use eth0. The exception we
>> later see is:
>> ---
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:744)
>> ---
>> The root cause seems to be that network interface selection is still using
>> the old jobmanager location and hence is not able to choose the right
>> interface. In particular, it seems that iteration order over the network
>> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
>> to the wrong interface being selected.
>>
>> Cheers,
>>  Max
>> —
>> Maximilian Bode * Junior Consultant * [hidden email]
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>




Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Till Rohrmann-2
No I don't think this behaviour has been introduced by HA. That is the default behaviour we used for a long time. If you think we should still change it, then I can open an issue for it.

On Thu, Mar 3, 2016 at 12:20 PM, Stephan Ewen <[hidden email]> wrote:
Okay, that is a change from the original behavior, introduced in HA. Originally, if the connection attempts failed, it always returned the InetAddress.getLocalHost() interface.
I think we should change it back to that, because that interface is by far the best possible heuristic.

On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <[hidden email]> wrote:

If I’m not mistaken, then it’s not necessarily true that the heuristic returns InetAddress.getLocalHost() in all cases. The heuristic will select the first network interface with the afore-mentioned conditions but before returning it, it will try a last time to connect to the JM via the interface bound to InetAddress.getLocalHost(). However, if this fails, then the heuristically selected network interface will be returned.


On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
If the ThasManager cannot connect to the JobManager, it will use the interface that is bound to the machine's host name ("InetAddress.getLocalHost()").

So, the best way to fix this would be to make sure that all machines have a proper network configuration. Then Flink would either use an address that can connect (via trying various interfaces), or it would default back to the hostname/interface that is configured on the machine.


On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <[hidden email]> wrote:

Hi Max,

the problem is that before starting the TM, we have to find the network interface which is reachable by the other machines. So what we do is to connect to the current JobManager. If it should happen, as in your case, that the JobManager just died and the new JM address has not been written to ZooKeeper, then the TMs don’t have much choice other than using the heuristic.

I can’t really tell why eth1 is chosen over eth0. The condition is that the interface address is an Inet4Address, no link local address as well as not a loopback address.

Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the easiest solution to solve your problem. I’ve checked the default value is set to 10 s which might be a bit too low for restarting a new JM and publishing its address via ZooKeeper.

Cheers,
Till


On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <[hidden email]> wrote:
I had an offline chat with Till about this. He pointed out that the
address is chosen once at start up time (while not being able to
connect to the old job manager) and then it stays fixed at eth1.

You can increase the lookup timeout by setting akka.lookup.timeout to
a higher value (like 100 s). This is the only workaroud I'm aware of
at this point. Maybe Till can chime in here whether this has other
implications as well?

– Ufuk

On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <[hidden email]> wrote:
> Hey Max!
>
> for the first WARN in
> org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> expected if the new leader has not updated ZooKeeper yet. The
> important thing is that the new leading job manager is eventually
> retrieved. This did happen, right?
>
> Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> task manager should re-try connecting to it with the same strategy as
> in the initial connection establishment (e.g. try SLOW first and only
> fall back to HEURISTIC). Can you see in the logs whether this happens?
>
> The best thing would be to share the complete logs. Is this possible?
> If not publicly, feel free to send it to me privately (uce at apache
> org).
>
> – Ufuk
>
>
> On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
> <[hidden email]> wrote:
>> Hi everyone,
>>
>> we are trying to get to work JobManager HA in the context of a per-job YARN
>> session using the 1.0.0-rc3 from a few days ago and are having a problem
>> concerning task managers with several network interfaces.
>>
>> After manually killing the job manager process, the jobmanager.log on the
>> newly allocated second job manager reads:
>> ---
>> 2016-03-02 18:01:09,635 WARN  Remoting
>> - Tried to associate with unreachable remote address
>> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
>> all messages to this address will be delivered to dead letters. Reason:
>> Connection refused: /10.127.68.136:34811
>> 2016-03-02 18:01:09,644 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>> Path(/user/jobmanager)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>> at
>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>> at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ---
>> where the IP not found is from the old job manager. So far, is this the
>> expected behavior?
>>
>> The problem then arises on a new task manager, which also tries to connect
>> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
>> starts cycling through the available network interfaces, as can be seen in
>> the relevant taskmanager.log:
>> ---
>> 2016-03-02 18:01:13,636 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
>> select the network interface and address to use by connecting to the leading
>> JobManager.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
>> will try to connect for 10000 milliseconds before falling back to heuristics
>> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Retrieved new target address /10.127.68.136:34811.
>> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Trying to connect to address /10.127.68.136:34811
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address
>> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> ---
>> After five repetitions, the task manager stops trying to retrieve the leader
>> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
>> now on:
>> ---
>> 2016-03-02 18:01:23,650 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>> - EventThread shut down
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>> - Session: 0x25229757cff035b closed
>> 2016-03-02 18:01:23,664 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
>> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
>> for communication.
>> ---
>> Following the new jobmanager is discovered and the taskmanager is able to
>> register at the jobmanager using eth1. The problem is that connections TO
>> eth1 are not possible. So flink should always use eth0. The exception we
>> later see is:
>> ---
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:744)
>> ---
>> The root cause seems to be that network interface selection is still using
>> the old jobmanager location and hence is not able to choose the right
>> interface. In particular, it seems that iteration order over the network
>> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
>> to the wrong interface being selected.
>>
>> Cheers,
>>  Max
>> —
>> Maximilian Bode * Junior Consultant * [hidden email]
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>





Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Maximilian Bode
Hi Ufuk, Till and Stephan,

Yes, that is what we observed. The primary hostname, i.e. the one returned by the unix hostname command, is in fact bound to the eth0 interface, whereas Flink uses the eth1 interface (pertaining to another hostname).

Changing akka.lookup.timeout to 100 s seems to fix the problem as now the new job manager is available in sufficient time. I still would agree with Stephan on taking the local hostname being the preferred strategy.

Cheers,
 Max
— 
Maximilian Bode * Junior Consultant * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Am 03.03.2016 um 12:29 schrieb Till Rohrmann <[hidden email]>:

No I don't think this behaviour has been introduced by HA. That is the default behaviour we used for a long time. If you think we should still change it, then I can open an issue for it.

On Thu, Mar 3, 2016 at 12:20 PM, Stephan Ewen <[hidden email]> wrote:
Okay, that is a change from the original behavior, introduced in HA. Originally, if the connection attempts failed, it always returned the InetAddress.getLocalHost() interface.
I think we should change it back to that, because that interface is by far the best possible heuristic.

On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <[hidden email]> wrote:

If I’m not mistaken, then it’s not necessarily true that the heuristic returns InetAddress.getLocalHost() in all cases. The heuristic will select the first network interface with the afore-mentioned conditions but before returning it, it will try a last time to connect to the JM via the interface bound to InetAddress.getLocalHost(). However, if this fails, then the heuristically selected network interface will be returned.


On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
If the ThasManager cannot connect to the JobManager, it will use the interface that is bound to the machine's host name ("InetAddress.getLocalHost()").

So, the best way to fix this would be to make sure that all machines have a proper network configuration. Then Flink would either use an address that can connect (via trying various interfaces), or it would default back to the hostname/interface that is configured on the machine.


On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <[hidden email]> wrote:

Hi Max,

the problem is that before starting the TM, we have to find the network interface which is reachable by the other machines. So what we do is to connect to the current JobManager. If it should happen, as in your case, that the JobManager just died and the new JM address has not been written to ZooKeeper, then the TMs don’t have much choice other than using the heuristic.

I can’t really tell why eth1 is chosen over eth0. The condition is that the interface address is an Inet4Address, no link local address as well as not a loopback address.

Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the easiest solution to solve your problem. I’ve checked the default value is set to 10 s which might be a bit too low for restarting a new JM and publishing its address via ZooKeeper.

Cheers,
Till


On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <[hidden email]> wrote:
I had an offline chat with Till about this. He pointed out that the
address is chosen once at start up time (while not being able to
connect to the old job manager) and then it stays fixed at eth1.

You can increase the lookup timeout by setting akka.lookup.timeout to
a higher value (like 100 s). This is the only workaroud I'm aware of
at this point. Maybe Till can chime in here whether this has other
implications as well?

– Ufuk

On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <[hidden email]> wrote:
> Hey Max!
>
> for the first WARN in
> org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> expected if the new leader has not updated ZooKeeper yet. The
> important thing is that the new leading job manager is eventually
> retrieved. This did happen, right?
>
> Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> task manager should re-try connecting to it with the same strategy as
> in the initial connection establishment (e.g. try SLOW first and only
> fall back to HEURISTIC). Can you see in the logs whether this happens?
>
> The best thing would be to share the complete logs. Is this possible?
> If not publicly, feel free to send it to me privately (uce at apache
> org).
>
> – Ufuk
>
>
> On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
> <[hidden email]> wrote:
>> Hi everyone,
>>
>> we are trying to get to work JobManager HA in the context of a per-job YARN
>> session using the 1.0.0-rc3 from a few days ago and are having a problem
>> concerning task managers with several network interfaces.
>>
>> After manually killing the job manager process, the jobmanager.log on the
>> newly allocated second job manager reads:
>> ---
>> 2016-03-02 18:01:09,635 WARN  Remoting
>> - Tried to associate with unreachable remote address
>> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
>> all messages to this address will be delivered to dead letters. Reason:
>> Connection refused: /10.127.68.136:34811
>> 2016-03-02 18:01:09,644 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>> Path(/user/jobmanager)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>> at
>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>> at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ---
>> where the IP not found is from the old job manager. So far, is this the
>> expected behavior?
>>
>> The problem then arises on a new task manager, which also tries to connect
>> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
>> starts cycling through the available network interfaces, as can be seen in
>> the relevant taskmanager.log:
>> ---
>> 2016-03-02 18:01:13,636 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
>> select the network interface and address to use by connecting to the leading
>> JobManager.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
>> will try to connect for 10000 milliseconds before falling back to heuristics
>> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Retrieved new target address /10.127.68.136:34811.
>> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Trying to connect to address /10.127.68.136:34811
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address
>> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> ---
>> After five repetitions, the task manager stops trying to retrieve the leader
>> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
>> now on:
>> ---
>> 2016-03-02 18:01:23,650 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>> - EventThread shut down
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>> - Session: 0x25229757cff035b closed
>> 2016-03-02 18:01:23,664 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
>> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
>> for communication.
>> ---
>> Following the new jobmanager is discovered and the taskmanager is able to
>> register at the jobmanager using eth1. The problem is that connections TO
>> eth1 are not possible. So flink should always use eth0. The exception we
>> later see is:
>> ---
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:744)
>> ---
>> The root cause seems to be that network interface selection is still using
>> the old jobmanager location and hence is not able to choose the right
>> interface. In particular, it seems that iteration order over the network
>> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
>> to the wrong interface being selected.
>>
>> Cheers,
>>  Max
>> —
>> Maximilian Bode * Junior Consultant * [hidden email]
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>







signature.asc (465 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: YARN JobManager HA using wrong network interface

Till Rohrmann
I've created an issue [1] and opened a PR [2] to fix the issue.


Cheers,
Till


On Thu, Mar 3, 2016 at 12:33 PM, Maximilian Bode <[hidden email]> wrote:
Hi Ufuk, Till and Stephan,

Yes, that is what we observed. The primary hostname, i.e. the one returned by the unix hostname command, is in fact bound to the eth0 interface, whereas Flink uses the eth1 interface (pertaining to another hostname).

Changing akka.lookup.timeout to 100 s seems to fix the problem as now the new job manager is available in sufficient time. I still would agree with Stephan on taking the local hostname being the preferred strategy.

Cheers,
 Max
— 
Maximilian Bode * Junior Consultant * [hidden email]
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Am 03.03.2016 um 12:29 schrieb Till Rohrmann <[hidden email]>:

No I don't think this behaviour has been introduced by HA. That is the default behaviour we used for a long time. If you think we should still change it, then I can open an issue for it.

On Thu, Mar 3, 2016 at 12:20 PM, Stephan Ewen <[hidden email]> wrote:
Okay, that is a change from the original behavior, introduced in HA. Originally, if the connection attempts failed, it always returned the InetAddress.getLocalHost() interface.
I think we should change it back to that, because that interface is by far the best possible heuristic.

On Thu, Mar 3, 2016 at 11:39 AM, Till Rohrmann <[hidden email]> wrote:

If I’m not mistaken, then it’s not necessarily true that the heuristic returns InetAddress.getLocalHost() in all cases. The heuristic will select the first network interface with the afore-mentioned conditions but before returning it, it will try a last time to connect to the JM via the interface bound to InetAddress.getLocalHost(). However, if this fails, then the heuristically selected network interface will be returned.


On Thu, Mar 3, 2016 at 10:49 AM, Stephan Ewen <[hidden email]> wrote:
If the ThasManager cannot connect to the JobManager, it will use the interface that is bound to the machine's host name ("InetAddress.getLocalHost()").

So, the best way to fix this would be to make sure that all machines have a proper network configuration. Then Flink would either use an address that can connect (via trying various interfaces), or it would default back to the hostname/interface that is configured on the machine.


On Thu, Mar 3, 2016 at 10:43 AM, Till Rohrmann <[hidden email]> wrote:

Hi Max,

the problem is that before starting the TM, we have to find the network interface which is reachable by the other machines. So what we do is to connect to the current JobManager. If it should happen, as in your case, that the JobManager just died and the new JM address has not been written to ZooKeeper, then the TMs don’t have much choice other than using the heuristic.

I can’t really tell why eth1 is chosen over eth0. The condition is that the interface address is an Inet4Address, no link local address as well as not a loopback address.

Thus, Ufuk’s solution, to increase akka.lookup.timeout seems to be the easiest solution to solve your problem. I’ve checked the default value is set to 10 s which might be a bit too low for restarting a new JM and publishing its address via ZooKeeper.

Cheers,
Till


On Thu, Mar 3, 2016 at 10:28 AM, Ufuk Celebi <[hidden email]> wrote:
I had an offline chat with Till about this. He pointed out that the
address is chosen once at start up time (while not being able to
connect to the old job manager) and then it stays fixed at eth1.

You can increase the lookup timeout by setting akka.lookup.timeout to
a higher value (like 100 s). This is the only workaroud I'm aware of
at this point. Maybe Till can chime in here whether this has other
implications as well?

– Ufuk

On Thu, Mar 3, 2016 at 9:59 AM, Ufuk Celebi <[hidden email]> wrote:
> Hey Max!
>
> for the first WARN in
> org.apache.flink.runtime.webmonitor.JobManagerRetriever: this is
> expected if the new leader has not updated ZooKeeper yet. The
> important thing is that the new leading job manager is eventually
> retrieved. This did happen, right?
>
> Regarding eth1 vs. eth0: After the new job manager becomes leader, the
> task manager should re-try connecting to it with the same strategy as
> in the initial connection establishment (e.g. try SLOW first and only
> fall back to HEURISTIC). Can you see in the logs whether this happens?
>
> The best thing would be to share the complete logs. Is this possible?
> If not publicly, feel free to send it to me privately (uce at apache
> org).
>
> – Ufuk
>
>
> On Thu, Mar 3, 2016 at 9:21 AM, Maximilian Bode
> <[hidden email]> wrote:
>> Hi everyone,
>>
>> we are trying to get to work JobManager HA in the context of a per-job YARN
>> session using the 1.0.0-rc3 from a few days ago and are having a problem
>> concerning task managers with several network interfaces.
>>
>> After manually killing the job manager process, the jobmanager.log on the
>> newly allocated second job manager reads:
>> ---
>> 2016-03-02 18:01:09,635 WARN  Remoting
>> - Tried to associate with unreachable remote address
>> [akka.tcp://flink@10.127.68.136:34811]. Address is now gated for 5000 ms,
>> all messages to this address will be delivered to dead letters. Reason:
>> Connection refused: /10.127.68.136:34811
>> 2016-03-02 18:01:09,644 WARN
>> org.apache.flink.runtime.webmonitor.JobManagerRetriever       - Failed to
>> retrieve leader gateway and port.
>> akka.actor.ActorNotFound: Actor not found for:
>> ActorSelection[Anchor(akka.tcp://flink@10.127.68.136:34811/),
>> Path(/user/jobmanager)]
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
>> at
>> akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at
>> akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
>> at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
>> at
>> akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
>> at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508)
>> at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541)
>> at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531)
>> at
>> akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87)
>> at akka.remote.EndpointWriter.postStop(Endpoint.scala:561)
>> at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
>> at akka.remote.EndpointActor.aroundPostStop(Endpoint.scala:415)
>> at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>> at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>> at akka.actor.ActorCell.terminate(ActorCell.scala:369)
>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
>> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:279)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> ---
>> where the IP not found is from the old job manager. So far, is this the
>> expected behavior?
>>
>> The problem then arises on a new task manager, which also tries to connect
>> to the old job manager unsuccessfully. The ZooKeeperLeaderRetrievalService
>> starts cycling through the available network interfaces, as can be seen in
>> the relevant taskmanager.log:
>> ---
>> 2016-03-02 18:01:13,636 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Starting ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - Trying to
>> select the network interface and address to use by connecting to the leading
>> JobManager.
>> 2016-03-02 18:01:13,646 INFO
>> org.apache.flink.runtime.util.LeaderRetrievalUtils            - TaskManager
>> will try to connect for 10000 milliseconds before falling back to heuristics
>> 2016-03-02 18:01:13,712 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Retrieved new target address /10.127.68.136:34811.
>> 2016-03-02 18:01:14,079 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Trying to connect to address /10.127.68.136:34811
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address
>> 'task.manager.eth0.hostname.com/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,082 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.120.193.110': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/10.127.68.136': Connection refused
>> 2016-03-02 18:01:14,083 INFO  org.apache.flink.runtime.net.ConnectionUtils
>> - Failed to connect from address '/127.0.0.1': Connection refused
>> ---
>> After five repetitions, the task manager stops trying to retrieve the leader
>> and using the HEURISTIC strategy ends up using  eth1 (10.120.193.110) from
>> now on:
>> ---
>> 2016-03-02 18:01:23,650 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService.
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ClientCnxn
>> - EventThread shut down
>> 2016-03-02 18:01:23,655 INFO  org.apache.zookeeper.ZooKeeper
>> - Session: 0x25229757cff035b closed
>> 2016-03-02 18:01:23,664 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager
>> will use hostname/address 'task.manager.eth1.hostname.com' (10.120.193.110)
>> for communication.
>> ---
>> Following the new jobmanager is discovered and the taskmanager is able to
>> register at the jobmanager using eth1. The problem is that connections TO
>> eth1 are not possible. So flink should always use eth0. The exception we
>> later see is:
>> ---
>> java.io.IOException: Connecting the channel failed: Connecting to remote
>> task manager + 'other.task.manager.eth1.hostname/10.120.193.111:46620' has
>> failed. This might indicate that the remote task manager has been lost.
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:196)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:131)
>> at
>> org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:83)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:60)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:388)
>> at
>> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:411)
>> at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:744)
>> ---
>> The root cause seems to be that network interface selection is still using
>> the old jobmanager location and hence is not able to choose the right
>> interface. In particular, it seems that iteration order over the network
>> interfaces differs between the HEURISTIC and SLOW strategy, which then leads
>> to the wrong interface being selected.
>>
>> Cheers,
>>  Max
>> —
>> Maximilian Bode * Junior Consultant * [hidden email]
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>