Weird behavior in actorSystem shutdown in akka

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Weird behavior in actorSystem shutdown in akka

Joshua Fan
Hi, Till and users,

There is a weird behavior in actorSystem shutdown in akka of our flink platform.
We use flink 1.4.2 on yarn as our flink deploy mode, and we use an ongoing agent to submit flink job to yarn which is based on YarnClient. User can connect to the agent to submit job and disconnect, but the agent is always there. So, each time the user submit a job there would be a ActorSystem created, after the job submitted in detached mode successfully, the ActorSystem would be shutdown. 
The weird thing is that there always an akka error message turn out in jm log after 2 days( 2 day is the default value in akka of quarantine-after-silence), like below.

2018-11-19 09:30:34.212 [flink-akka.actor.default-dispatcher-2] ERROR akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-5 - Association to [akka.tcp://[hidden email]:35767] with UID [-1757115446] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In the above, the client01v*** is the host node where runs the agent, and the above error turns out randomly. We trigger a savepoint in the agent every half hour, it means the actorSystem will be created and shutdown accordingly. But only 1 of 50 chance  the shutdown will raise a error like above.

I think maybe it refer to the akka system. I checked the akka code, found some clues as below.
for those there is no error raised in two days, the log in jm like this:

2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-23 - Association between local [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:56906] was disassociated because the ProtocolStateActor failed: Shutdown
2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-23 - Association between local [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:56906] was disassociated because the ProtocolStateActor failed: Shutdown
2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 - Remote system with address [akka.tcp://flink@xxxx:41769] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 - Remote system with address [akka.tcp://flink@xxxx:41769] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.

It seems the remote actor receives the shutdown proposal, the akka message may flow like below:
1.The agent shut down the actorSystem
2.The EndpointReader in jm  receives an AssociationHandle. Shutdown  and EndpointReader just throws it as a ShutDownAssociation, and the EndpointWriter will publishAndthrow the ShutDownAssociation again.  
2.when the ReliableDeliverySupervisor in jm gets an AssociationProblem reported by the EndpointWriter, it also throw it out.
3.when the EndpointManager in jm gets the ShutDownAssociation exception, the EndpointManager would stop the actor.

but for the one which will raised the silent error , the log in jm like this, seems the remote actor did not receives the shutdown proposal:

2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-14 - Association between local [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:45103] was disassociated because the ProtocolStateActor failed: Unknown
2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-14 - Association between local [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:45103] was disassociated because the ProtocolStateActor failed: Unknown
2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-20 - Association with remote system [akka.tcp://flink@xxxx:35767] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-20 - Association with remote system [akka.tcp://flink@xxxx:35767] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 

The akka message may flow like below, I guess:
1.The agent shut down the actorSystem
2.The EndpointReader in jm receives an AssociationHandle.Unknown instead of AssociationHandle.Shutdown, so the EndpointReader stop, and the EndpointWriter will get a Terminated message and throws an EndpointDisassociatedException, 
3.and the  ReliableDeliverySupervisor treats the EndpointDisassociatedException as a NonFatal exception, and it  do something and eventually stop, but I think it may not stop.
See the code here:
case NonFatal(e) ⇒
      val causedBy = if (e.getCause == null) "" else s"Caused by: [${e.getCause.getMessage}]"
      log.warning(
        "Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}",
        remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy)
      uidConfirmed = false // Need confirmation of UID again
      if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
        bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
      context.become(gated(writerTerminated = false, earlyUngateRequested = false))
      currentHandle = None
      context.parent ! StoppedReading(self)
      Stop

but the silent error message should be raised as:
gotoIdle:
private def goToIdle(): Unit = {
    if (maxSilenceTimer.isEmpty)
      maxSilenceTimer = Some(context.system.scheduler.scheduleOnce(settings.QuarantineSilentSystemTimeout, self, TooLongIdle))
    context.become(idle)
  }
and after two days:
 def idle: Receive = {
    case IsIdle ⇒ sender() ! Idle
    case s: Send ⇒
      writer = createWriter()
      // Resending will be triggered by the incoming GotUid message after the connection finished
      handleSend(s)
      goToActive()
    case AttemptSysMsgRedelivery ⇒
      if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) {
        writer = createWriter()
        // Resending will be triggered by the incoming GotUid message after the connection finished
        goToActive()
      }
    case TooLongIdle ⇒
      throw new HopelessAssociation(localAddress, remoteAddress, uid,
        new TimeoutException("Remote system has been silent for too long. " +
          s"(more than ${settings.QuarantineSilentSystemTimeout.toUnit(TimeUnit.HOURS)} hours)"))
    case EndpointWriter.FlushAndStop ⇒ context.stop(self)
    case EndpointWriter.StopReading(w, replyTo) ⇒
      replyTo ! EndpointWriter.StoppedReading(w)
    case Ungate ⇒ // ok, not gated
  }

You can see the TooLongIdle  can raise the error message in the end. So, maybe the actor should stop but  goes to the idle state somehow.

OK, I try my best to explain the message process, and I hope I did it.

Here is my question:
1.why most of the shutdown is OK but the other raised a error?
2.Is there two connection between the actors, one for data and the other one for system message? Because as the log shows, there are two ports in the agent side.

Thank you for your patience here ,it is so long a post.
If I didn't make the situation clear enough, please let me know.
Thank you all.

Yours
Joshua

Reply | Threaded
Open this post in threaded view
|

Re: Weird behavior in actorSystem shutdown in akka

Till Rohrmann
Hi Joshua,

sorry for getting back to you so late. Personally, I haven't seen this problem before. Without more log context I think I won't be able to help you. This looks a bit more like an Akka problem than a Flink problem to be honest. 

One cause could be that akka.remote.flush-wait-on-shutdown is set too low. But this should only happen when you shut down the remote ActorSystem (JM ActorSystem) and you should see "Shutdown finished, but flushing might not have been successful and some messages might have been dropped. Increase akka.remote.flush-wait-on-shutdown to a larger value to avoid this." in the logs.

I don't know which two ports you are referring to for the agent. I think it would help to share also the logs of your agent.

Cheers,
Till

On Mon, Nov 19, 2018 at 1:49 PM Joshua Fan <[hidden email]> wrote:
Hi, Till and users,

There is a weird behavior in actorSystem shutdown in akka of our flink platform.
We use flink 1.4.2 on yarn as our flink deploy mode, and we use an ongoing agent to submit flink job to yarn which is based on YarnClient. User can connect to the agent to submit job and disconnect, but the agent is always there. So, each time the user submit a job there would be a ActorSystem created, after the job submitted in detached mode successfully, the ActorSystem would be shutdown. 
The weird thing is that there always an akka error message turn out in jm log after 2 days( 2 day is the default value in akka of quarantine-after-silence), like below.

2018-11-19 09:30:34.212 [flink-akka.actor.default-dispatcher-2] ERROR akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-5 - Association to [akka.tcp://[hidden email]:35767] with UID [-1757115446] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

In the above, the client01v*** is the host node where runs the agent, and the above error turns out randomly. We trigger a savepoint in the agent every half hour, it means the actorSystem will be created and shutdown accordingly. But only 1 of 50 chance  the shutdown will raise a error like above.

I think maybe it refer to the akka system. I checked the akka code, found some clues as below.
for those there is no error raised in two days, the log in jm like this:

2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-23 - Association between local [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:56906] was disassociated because the ProtocolStateActor failed: Shutdown
2018-11-17 04:31:09.208 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-23 - Association between local [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:56906] was disassociated because the ProtocolStateActor failed: Shutdown
2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 - Remote system with address [akka.tcp://flink@xxxx:41769] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
2018-11-17 04:31:09.209 [flink-akka.actor.default-dispatcher-17] DEBUG akka.remote.Remoting flink-akka.remote.default-remote-dispatcher-15 - Remote system with address [akka.tcp://flink@xxxx:41769] has shut down. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.

It seems the remote actor receives the shutdown proposal, the akka message may flow like below:
1.The agent shut down the actorSystem
2.The EndpointReader in jm  receives an AssociationHandle. Shutdown  and EndpointReader just throws it as a ShutDownAssociation, and the EndpointWriter will publishAndthrow the ShutDownAssociation again.  
2.when the ReliableDeliverySupervisor in jm gets an AssociationProblem reported by the EndpointWriter, it also throw it out.
3.when the EndpointManager in jm gets the ShutDownAssociation exception, the EndpointManager would stop the actor.

but for the one which will raised the silent error , the log in jm like this, seems the remote actor did not receives the shutdown proposal:

2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-14 - Association between local [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:45103] was disassociated because the ProtocolStateActor failed: Unknown
2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] DEBUG akka.remote.transport.ProtocolStateActor flink-akka.remote.default-remote-dispatcher-14 - Association between local [tcp://flink@yyyy:29448] and remote [tcp://flink@xxxx:45103] was disassociated because the ProtocolStateActor failed: Unknown
2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-20 - Association with remote system [akka.tcp://flink@xxxx:35767] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
2018-11-17 09:30:29.177 [flink-akka.actor.default-dispatcher-4] WARN  akka.remote.ReliableDeliverySupervisor flink-akka.remote.default-remote-dispatcher-20 - Association with remote system [akka.tcp://flink@xxxx:35767] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 

The akka message may flow like below, I guess:
1.The agent shut down the actorSystem
2.The EndpointReader in jm receives an AssociationHandle.Unknown instead of AssociationHandle.Shutdown, so the EndpointReader stop, and the EndpointWriter will get a Terminated message and throws an EndpointDisassociatedException, 
3.and the  ReliableDeliverySupervisor treats the EndpointDisassociatedException as a NonFatal exception, and it  do something and eventually stop, but I think it may not stop.
See the code here:
case NonFatal(e) ⇒
      val causedBy = if (e.getCause == null) "" else s"Caused by: [${e.getCause.getMessage}]"
      log.warning(
        "Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}",
        remoteAddress, settings.RetryGateClosedFor.toMillis, e.getMessage, causedBy)
      uidConfirmed = false // Need confirmation of UID again
      if ((resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) && bailoutAt.isEmpty)
        bailoutAt = Some(Deadline.now + settings.InitialSysMsgDeliveryTimeout)
      context.become(gated(writerTerminated = false, earlyUngateRequested = false))
      currentHandle = None
      context.parent ! StoppedReading(self)
      Stop

but the silent error message should be raised as:
gotoIdle:
private def goToIdle(): Unit = {
    if (maxSilenceTimer.isEmpty)
      maxSilenceTimer = Some(context.system.scheduler.scheduleOnce(settings.QuarantineSilentSystemTimeout, self, TooLongIdle))
    context.become(idle)
  }
and after two days:
 def idle: Receive = {
    case IsIdle ⇒ sender() ! Idle
    case s: Send ⇒
      writer = createWriter()
      // Resending will be triggered by the incoming GotUid message after the connection finished
      handleSend(s)
      goToActive()
    case AttemptSysMsgRedelivery ⇒
      if (resendBuffer.nacked.nonEmpty || resendBuffer.nonAcked.nonEmpty) {
        writer = createWriter()
        // Resending will be triggered by the incoming GotUid message after the connection finished
        goToActive()
      }
    case TooLongIdle ⇒
      throw new HopelessAssociation(localAddress, remoteAddress, uid,
        new TimeoutException("Remote system has been silent for too long. " +
          s"(more than ${settings.QuarantineSilentSystemTimeout.toUnit(TimeUnit.HOURS)} hours)"))
    case EndpointWriter.FlushAndStop ⇒ context.stop(self)
    case EndpointWriter.StopReading(w, replyTo) ⇒
      replyTo ! EndpointWriter.StoppedReading(w)
    case Ungate ⇒ // ok, not gated
  }

You can see the TooLongIdle  can raise the error message in the end. So, maybe the actor should stop but  goes to the idle state somehow.

OK, I try my best to explain the message process, and I hope I did it.

Here is my question:
1.why most of the shutdown is OK but the other raised a error?
2.Is there two connection between the actors, one for data and the other one for system message? Because as the log shows, there are two ports in the agent side.

Thank you for your patience here ,it is so long a post.
If I didn't make the situation clear enough, please let me know.
Thank you all.

Yours
Joshua