Random Task executor shutdown

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Random Task executor shutdown

LINZ, Arnaud
(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Random Task executor shutdown

Guowei Ma
Hi, Arnaud
Would you like to share the log of the shutdown task executor?
BTW could you check the gc log of the task executor?
Best,
Guowei


On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud <[hidden email]> wrote:
(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

RE: Random Task executor shutdown

LINZ, Arnaud

Hello,

 

Actually the log is more complete when the application ends, and it’s a Zookeeper related issue.

I took another log.

Job Manager’s log:

 

(…)

2020-11-12 14:34:09,798 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 73 from task d0d19968414641878c35c392e2062c59 of job b62ad2008af85add0ff9e6680be0cc42 at container_e38_1604477334666_0733_01_000003 @ XXX (dataPort=33692).

2020-11-12 14:34:15,015 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0733_01_000005 because: The TaskExecutor is shutting down.

2020-11-12 14:34:15,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (13/15) (4010b02f9f8094b38f182d1f55b9be4b) switched from RUNNING to FAILED.

org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)

    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

    at akka.actor.ActorCell.invoke(ActorCell.scala:561)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

    at akka.dispatch.Mailbox.run(Mailbox.scala:225)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2020-11-12 14:34:15,111 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_12.

(…)

 

The shutting down task executor log is:

 

(…)

2020-11-12 14:34:14,497 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED

2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.

2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'

2020-11-12 14:34:14,880 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session

2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000

2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED

2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:34:14,886 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect

java.io.IOException: Xid out of order. Got Xid 228 with err 0 expected Xid 229 for a packet with details: clientPath:null serverPath:null finished:false header:: 229,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

2020-11-12 14:34:14,986 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED

2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,988 ERROR org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Background operation retry gave up

org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

(…)

020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96).

2020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96) switched from RUNNING to FAILED.

org.apache.flink.util.FlinkException: Closing task slot table

    at org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.closeAsync(TaskSlotTableImpl.java:160)

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:375)

    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)

(…)

Then plenty of other errors.

 

Currently I have : high-availability.zookeeper.client.session-timeout: 300000

But between the first error message:

2020-11-12 14:33:43,548 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.

2020-11-12 14:33:43,563 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'

2020-11-12 14:33:43,564 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session

2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000

2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED

2020-11-12 14:33:43,569 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:33:43,570 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:33:43,592 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect

java.io.IOException: Xid out of order. Got Xid 37 with err 0 expected Xid 38 for a packet with details: clientPath:null serverPath:null finished:false header:: 38,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

 

and the last one, there is only one minute. Is there other parameters to adjust to make the Zookeeper synchronization more robust when the network is slowed down ?

 

Best,

Arnaud

 

 

De : Guowei Ma <[hidden email]>
Envoyé : mardi 17 novembre 2020 00:49
À : LINZ, Arnaud <[hidden email]>
Cc : user <[hidden email]>
Objet : Re: Random Task executor shutdown

 

Hi, Arnaud

Would you like to share the log of the shutdown task executor?

BTW could you check the gc log of the task executor?

Best,

Guowei

 

 

On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud <[hidden email]> wrote:

(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

RE: Random Task executor shutdown

LINZ, Arnaud

Hello,

 

We are wondering whether it is related to https://issues.apache.org/jira/browse/ZOOKEEPER-2775 or not.

What is the version of the shaded zookeeper client in Flink 1.10.0 ?

Best,

Arnaud

 

De : LINZ, Arnaud
Envoyé : mercredi 18 novembre 2020 09:39
À : 'Guowei Ma' <[hidden email]>
Cc : user <[hidden email]>
Objet : RE: Random Task executor shutdown

 

Hello,

 

Actually the log is more complete when the application ends, and it’s a Zookeeper related issue.

I took another log.

Job Manager’s log:

 

(…)

2020-11-12 14:34:09,798 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 73 from task d0d19968414641878c35c392e2062c59 of job b62ad2008af85add0ff9e6680be0cc42 at container_e38_1604477334666_0733_01_000003 @ XXX (dataPort=33692).

2020-11-12 14:34:15,015 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0733_01_000005 because: The TaskExecutor is shutting down.

2020-11-12 14:34:15,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (13/15) (4010b02f9f8094b38f182d1f55b9be4b) switched from RUNNING to FAILED.

org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)

    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

    at akka.actor.ActorCell.invoke(ActorCell.scala:561)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

    at akka.dispatch.Mailbox.run(Mailbox.scala:225)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2020-11-12 14:34:15,111 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_12.

(…)

 

The shutting down task executor log is:

 

(…)

2020-11-12 14:34:14,497 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED

2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.

2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'

2020-11-12 14:34:14,880 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session

2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000

2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED

2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:34:14,886 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect

java.io.IOException: Xid out of order. Got Xid 228 with err 0 expected Xid 229 for a packet with details: clientPath:null serverPath:null finished:false header:: 229,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

2020-11-12 14:34:14,986 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED

2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,988 ERROR org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Background operation retry gave up

org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

(…)

020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96).

2020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96) switched from RUNNING to FAILED.

org.apache.flink.util.FlinkException: Closing task slot table

    at org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.closeAsync(TaskSlotTableImpl.java:160)

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:375)

    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)

(…)

Then plenty of other errors.

 

Currently I have : high-availability.zookeeper.client.session-timeout: 300000

But between the first error message:

2020-11-12 14:33:43,548 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.

2020-11-12 14:33:43,563 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'

2020-11-12 14:33:43,564 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session

2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000

2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED

2020-11-12 14:33:43,569 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:33:43,570 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:33:43,592 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect

java.io.IOException: Xid out of order. Got Xid 37 with err 0 expected Xid 38 for a packet with details: clientPath:null serverPath:null finished:false header:: 38,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

 

and the last one, there is only one minute. Is there other parameters to adjust to make the Zookeeper synchronization more robust when the network is slowed down ?

 

Best,

Arnaud

 

 

De : Guowei Ma <[hidden email]>
Envoyé : mardi 17 novembre 2020 00:49
À : LINZ, Arnaud <[hidden email]>
Cc : user <[hidden email]>
Objet : Re: Random Task executor shutdown

 

Hi, Arnaud

Would you like to share the log of the shutdown task executor?

BTW could you check the gc log of the task executor?

Best,

Guowei

 

 

On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud <[hidden email]> wrote:

(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

RE: Random Task executor shutdown

LINZ, Arnaud

Hi,

It’s 3.4.10 and does contain the bug. I’ll patch my flink client and see if it happens again.

Best regards,

Arnaud

 

De : LINZ, Arnaud
Envoyé : mercredi 18 novembre 2020 10:35
À : 'Guowei Ma' <[hidden email]>
Cc : 'user' <[hidden email]>
Objet : RE: Random Task executor shutdown

 

Hello,

 

We are wondering whether it is related to https://issues.apache.org/jira/browse/ZOOKEEPER-2775 or not.

What is the version of the shaded zookeeper client in Flink 1.10.0 ?

Best,

Arnaud

 

De : LINZ, Arnaud
Envoyé : mercredi 18 novembre 2020 09:39
À : 'Guowei Ma' <[hidden email]>
Cc : user <[hidden email]>
Objet : RE: Random Task executor shutdown

 

Hello,

 

Actually the log is more complete when the application ends, and it’s a Zookeeper related issue.

I took another log.

Job Manager’s log:

 

(…)

2020-11-12 14:34:09,798 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 73 from task d0d19968414641878c35c392e2062c59 of job b62ad2008af85add0ff9e6680be0cc42 at container_e38_1604477334666_0733_01_000003 @ XXX (dataPort=33692).

2020-11-12 14:34:15,015 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0733_01_000005 because: The TaskExecutor is shutting down.

2020-11-12 14:34:15,072 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (13/15) (4010b02f9f8094b38f182d1f55b9be4b) switched from RUNNING to FAILED.

org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)

    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

    at akka.actor.ActorCell.invoke(ActorCell.scala:561)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

    at akka.dispatch.Mailbox.run(Mailbox.scala:225)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2020-11-12 14:34:15,111 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_12.

(…)

 

The shutting down task executor log is:

 

(…)

2020-11-12 14:34:14,497 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED

2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,497 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.

2020-11-12 14:34:14,879 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'

2020-11-12 14:34:14,880 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session

2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000

2020-11-12 14:34:14,881 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED

2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:34:14,881 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:34:14,886 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect

java.io.IOException: Xid out of order. Got Xid 228 with err 0 expected Xid 229 for a packet with details: clientPath:null serverPath:null finished:false header:: 229,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

2020-11-12 14:34:14,986 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: SUSPENDED

2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,987 WARN  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.

2020-11-12 14:34:14,988 ERROR org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl  - Background operation retry gave up

org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.KeeperException.create(KeeperException.java:99)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.checkBackgroundRetry(CuratorFrameworkImpl.java:728)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:857)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)

    at org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)

    at java.util.concurrent.FutureTask.run(FutureTask.java:266)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)

    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

(…)

020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96).

2020-11-12 14:34:15,077 INFO  org.apache.flink.runtime.taskmanager.Task                     - Map -> Sink (1/15) (053988ce32e1069ad296fd9835554f96) switched from RUNNING to FAILED.

org.apache.flink.util.FlinkException: Closing task slot table

    at org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl.closeAsync(TaskSlotTableImpl.java:160)

    at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:375)

    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)

(…)

Then plenty of other errors.

 

Currently I have : high-availability.zookeeper.client.session-timeout: 300000

But between the first error message:

2020-11-12 14:33:43,548 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.client.ZooKeeperSaslClient  - Client will use GSSAPI as SASL mechanism.

2020-11-12 14:33:43,563 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181. Will attempt to SASL-authenticate using Login Context section 'Client'

2020-11-12 14:33:43,564 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, initiating session

2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, sessionid = 0x175924beaf03acf, negotiated timeout = 60000

2020-11-12 14:33:43,566 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: RECONNECTED

2020-11-12 14:33:43,569 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:33:43,570 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.

2020-11-12 14:33:43,592 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session 0x175924beaf03acf for server io-master-u1-n02.bie.jupiter.nbyt.fr/10.136.169.130:2181, unexpected error, closing socket connection and attempting reconnect

java.io.IOException: Xid out of order. Got Xid 37 with err 0 expected Xid 38 for a packet with details: clientPath:null serverPath:null finished:false header:: 38,101  replyHeader:: 0,0,-4  request:: 347892643788,v{'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/resource_manager_lock,'/flink_prd_XXX_StreamFlink/application_1604477334666_0733/leader/b62ad2008af85add0ff9e6680be0cc42/job_manager_lock},v{},v{}  response:: null

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:823)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:94)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)

    at org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

 

and the last one, there is only one minute. Is there other parameters to adjust to make the Zookeeper synchronization more robust when the network is slowed down ?

 

Best,

Arnaud

 

 

De : Guowei Ma <[hidden email]>
Envoyé : mardi 17 novembre 2020 00:49
À : LINZ, Arnaud <[hidden email]>
Cc : user <[hidden email]>
Objet : Re: Random Task executor shutdown

 

Hi, Arnaud

Would you like to share the log of the shutdown task executor?

BTW could you check the gc log of the task executor?

Best,

Guowei

 

 

On Mon, Nov 16, 2020 at 8:57 PM LINZ, Arnaud <[hidden email]> wrote:

(reposted with proper subject line -- sorry for the copy/paste)
-----Original message-----
Hello,

I'm running Flink 1.10 on a yarn cluster. I have a streaming application, that, when under heavy load, fails from time to time with this unique error message in the whole yarn log:

(...)
2020-11-15 16:18:42,202 WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received late message for now expired checkpoint attempt 63 from task 4cbc940112a596db54568b24f9209aac of job 1e1717d19bd8ea296314077e42e1c7e5 at container_e38_1604477334666_0960_01_000004 @ xxx (dataPort=33099).
2020-11-15 16:18:55,043 INFO  org.apache.flink.yarn.YarnResourceManager                     - Closing TaskExecutor connection container_e38_1604477334666_0960_01_000004 because: The TaskExecutor is shutting down.
2020-11-15 16:18:55,087 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Map (7/15) (c8e92cacddcd4e41f51a2433d07d2153) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.

      at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:359)
        at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:218)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:509)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:175)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-11-15 16:18:55,092 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - Calculating tasks to restart to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
2020-11-15 16:18:55,101 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy  - 230 tasks should be restarted to recover the failed task 2f6467d98899e64a4721f0a7b6a059a8_6.
(...)

What could be the cause of this failure? Why is there no other error message?

I've tried to increase the value of heartbeat.timeout, thinking that maybe it was due to a slow responding mapper, but it did not solve the issue.

Best regards,
Arnaud

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.