Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Lu Niu
Hi, Flink users

Recently we migrated to flink 1.11 and see exceptions like:
```
2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

```
2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Xintong Song
Hi Lu,

I assume you are using ZooKeeper as the HA service?

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:
Hi, Flink users

Recently we migrated to flink 1.11 and see exceptions like:
```
2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

```
2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Lu Niu
Hi, Xintong

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

Best
Lu

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:
Hi Lu,

I assume you are using ZooKeeper as the HA service?

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:
Hi, Flink users

Recently we migrated to flink 1.11 and see exceptions like:
```
2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

```
2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Xintong Song
I'm not aware of any significant changes to the HA components between 1.9/1.11.
Would you mind sharing the complete jobmanager/taskmanager logs?

Thank you~

Xintong Song



On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:
Hi, Xintong

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

Best
Lu

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:
Hi Lu,

I assume you are using ZooKeeper as the HA service?

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:
Hi, Flink users

Recently we migrated to flink 1.11 and see exceptions like:
```
2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

```
2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Lu Niu
After checking the log I found the root cause is zk client timeout on TM:
```
2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership.
```

I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks!

Best
Lu


On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <[hidden email]> wrote:
I'm not aware of any significant changes to the HA components between 1.9/1.11.
Would you mind sharing the complete jobmanager/taskmanager logs?

Thank you~

Xintong Song



On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:
Hi, Xintong

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

Best
Lu

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:
Hi Lu,

I assume you are using ZooKeeper as the HA service?

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:
Hi, Flink users

Recently we migrated to flink 1.11 and see exceptions like:
```
2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

```
2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Xintong Song
The ZK client side uses 15s connection timeout and 60s session timeout in Flink. There's nothing similar to a heartbeat interval configured, which I assume is up to ZK's internal implementation. These things have not changed in FLink since at least 2017.

If both ZK client and server complain about timeout, and there's no gc issue spotted, I would consider a network instability.

Thank you~

Xintong Song



On Fri, Jan 29, 2021 at 3:15 AM Lu Niu <[hidden email]> wrote:
After checking the log I found the root cause is zk client timeout on TM:
```
2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership.
```

I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks!

Best
Lu


On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <[hidden email]> wrote:
I'm not aware of any significant changes to the HA components between 1.9/1.11.
Would you mind sharing the complete jobmanager/taskmanager logs?

Thank you~

Xintong Song



On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:
Hi, Xintong

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

Best
Lu

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:
Hi Lu,

I assume you are using ZooKeeper as the HA service?

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:
Hi, Flink users

Recently we migrated to flink 1.11 and see exceptions like:
```
2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

```
2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

Best
Lu
Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Xintong Song

Thank you~

Xintong Song



On Sat, Jan 30, 2021 at 8:27 AM Xintong Song <[hidden email]> wrote:
There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not aware of any similar issue reported since the upgrading.
I would suggest the following:
- Turn on the DEBUG log see if there's any valuable details
- Maybe try asking in the Apache Zookeeper community, see if this is a known issue.

Thank you~
Xintong Song



Thank you~

Xintong Song



On Sat, Jan 30, 2021 at 6:47 AM Lu Niu <[hidden email]> wrote:
Hi, Xintong

Thanks for replying. Could it relate to the zk version? We are a platform team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9 and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced in 1.11 jobs. That's why we think it is related to version upgrade. 

Best
Lu

On Thu, Jan 28, 2021 at 7:56 PM Xintong Song <[hidden email]> wrote:
The ZK client side uses 15s connection timeout and 60s session timeout in Flink. There's nothing similar to a heartbeat interval configured, which I assume is up to ZK's internal implementation. These things have not changed in FLink since at least 2017.

If both ZK client and server complain about timeout, and there's no gc issue spotted, I would consider a network instability.

Thank you~

Xintong Song



On Fri, Jan 29, 2021 at 3:15 AM Lu Niu <[hidden email]> wrote:
After checking the log I found the root cause is zk client timeout on TM:
```
2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership.
```

I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks!

Best
Lu


On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <[hidden email]> wrote:
I'm not aware of any significant changes to the HA components between 1.9/1.11.
Would you mind sharing the complete jobmanager/taskmanager logs?

Thank you~

Xintong Song



On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:
Hi, Xintong

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

Best
Lu

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:
Hi Lu,

I assume you are using ZooKeeper as the HA service?

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

Thank you~

Xintong Song



On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:
Hi, Flink users

Recently we migrated to flink 1.11 and see exceptions like:
```
2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@50abf386.
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

```
2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2362b2fd.
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

Best
Lu
Reply | Threaded
Open this post in threaded view
|

RE: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Colletta, Edward

“but I'm not aware of any similar issue reported since the upgrading”

For the record, we experienced this same error on Flink 1.11.2 this past week.

 

From: Xintong Song <[hidden email]>
Sent: Friday, January 29, 2021 7:34 PM
To: user <[hidden email]>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

 

This email is from an external source - exercise caution regarding links and attachments.

 


Thank you~

Xintong Song

 

 

On Sat, Jan 30, 2021 at 8:27 AM Xintong Song <[hidden email]> wrote:

There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not aware of any similar issue reported since the upgrading.

I would suggest the following:

- Turn on the DEBUG log see if there's any valuable details

- Maybe try asking in the Apache Zookeeper community, see if this is a known issue.


Thank you~
Xintong Song


Thank you~

Xintong Song

 

On Sat, Jan 30, 2021 at 6:47 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying. Could it relate to the zk version? We are a platform team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9 and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced in 1.11 jobs. That's why we think it is related to version upgrade. 

 

Best

Lu

 

On Thu, Jan 28, 2021 at 7:56 PM Xintong Song <[hidden email]> wrote:

The ZK client side uses 15s connection timeout and 60s session timeout in Flink. There's nothing similar to a heartbeat interval configured, which I assume is up to ZK's internal implementation. These things have not changed in FLink since at least 2017.

 

If both ZK client and server complain about timeout, and there's no gc issue spotted, I would consider a network instability.


Thank you~

Xintong Song

 

 

On Fri, Jan 29, 2021 at 3:15 AM Lu Niu <[hidden email]> wrote:

After checking the log I found the root cause is zk client timeout on TM:

```

2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership.

```

 

I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks!

 

Best

Lu

 

 

On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <[hidden email]> wrote:

I'm not aware of any significant changes to the HA components between 1.9/1.11.

Would you mind sharing the complete jobmanager/taskmanager logs?


Thank you~

Xintong Song

 

 

On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

 

Best

Lu

 

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:

Hi Lu,

 

I assume you are using ZooKeeper as the HA service?

 

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

 

Thank you~

Xintong Song

 

 

On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:

Hi, Flink users

 

Recently we migrated to flink 1.11 and see exceptions like:

```

2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on [hidden email].
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]

```

 

```

2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on [hidden email].
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

 

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

 

Best

Lu

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Xintong Song
Hi Colletta,

This error is kind of expected if the JobMaster / ResourceManager does not maintain a stable connection to the ZooKeeper service, which may be caused by network issues, GC pause, or unstable ZK service availability.

By "similar issue", what I meant is I'm not aware of any issue related to the upgrading of the ZK version that may cause the leadership loss.

Thank you~

Xintong Song



On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward <[hidden email]> wrote:

“but I'm not aware of any similar issue reported since the upgrading”

For the record, we experienced this same error on Flink 1.11.2 this past week.

 

From: Xintong Song <[hidden email]>
Sent: Friday, January 29, 2021 7:34 PM
To: user <[hidden email]>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

 

This email is from an external source - exercise caution regarding links and attachments.

 


Thank you~

Xintong Song

 

 

On Sat, Jan 30, 2021 at 8:27 AM Xintong Song <[hidden email]> wrote:

There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not aware of any similar issue reported since the upgrading.

I would suggest the following:

- Turn on the DEBUG log see if there's any valuable details

- Maybe try asking in the Apache Zookeeper community, see if this is a known issue.


Thank you~
Xintong Song


Thank you~

Xintong Song

 

On Sat, Jan 30, 2021 at 6:47 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying. Could it relate to the zk version? We are a platform team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9 and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced in 1.11 jobs. That's why we think it is related to version upgrade. 

 

Best

Lu

 

On Thu, Jan 28, 2021 at 7:56 PM Xintong Song <[hidden email]> wrote:

The ZK client side uses 15s connection timeout and 60s session timeout in Flink. There's nothing similar to a heartbeat interval configured, which I assume is up to ZK's internal implementation. These things have not changed in FLink since at least 2017.

 

If both ZK client and server complain about timeout, and there's no gc issue spotted, I would consider a network instability.


Thank you~

Xintong Song

 

 

On Fri, Jan 29, 2021 at 3:15 AM Lu Niu <[hidden email]> wrote:

After checking the log I found the root cause is zk client timeout on TM:

```

2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership.

```

 

I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks!

 

Best

Lu

 

 

On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <[hidden email]> wrote:

I'm not aware of any significant changes to the HA components between 1.9/1.11.

Would you mind sharing the complete jobmanager/taskmanager logs?


Thank you~

Xintong Song

 

 

On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

 

Best

Lu

 

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:

Hi Lu,

 

I assume you are using ZooKeeper as the HA service?

 

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

 

Thank you~

Xintong Song

 

 

On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:

Hi, Flink users

 

Recently we migrated to flink 1.11 and see exceptions like:

```

2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on [hidden email].
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]

```

 

```

2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on [hidden email].
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

 

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

 

Best

Lu

Reply | Threaded
Open this post in threaded view
|

RE: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Colletta, Edward

 

FYI, we experience a similar error again, lost leadership but not due to timeout but a disconnect from zookeeper.  This time I examined logs for other errors related to zookeeper and found the kafka cluster that uses the same zookeeper also was disconnected. 

 

We run on AWS and this seems to be AWS related. 

 

 

From: Xintong Song <[hidden email]>
Sent: Sunday, January 31, 2021 9:23 PM
To: user <[hidden email]>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

 

This email is from an external source - exercise caution regarding links and attachments.

 

Hi Colletta,

 

This error is kind of expected if the JobMaster / ResourceManager does not maintain a stable connection to the ZooKeeper service, which may be caused by network issues, GC pause, or unstable ZK service availability.

 

By "similar issue", what I meant is I'm not aware of any issue related to the upgrading of the ZK version that may cause the leadership loss.


Thank you~

Xintong Song

 

 

On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward <[hidden email]> wrote:

“but I'm not aware of any similar issue reported since the upgrading”

For the record, we experienced this same error on Flink 1.11.2 this past week.

 

From: Xintong Song <[hidden email]>
Sent: Friday, January 29, 2021 7:34 PM
To: user <[hidden email]>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

 

This email is from an external source - exercise caution regarding links and attachments.

 


Thank you~

Xintong Song

 

 

On Sat, Jan 30, 2021 at 8:27 AM Xintong Song <[hidden email]> wrote:

There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not aware of any similar issue reported since the upgrading.

I would suggest the following:

- Turn on the DEBUG log see if there's any valuable details

- Maybe try asking in the Apache Zookeeper community, see if this is a known issue.


Thank you~
Xintong Song


Thank you~

Xintong Song

 

On Sat, Jan 30, 2021 at 6:47 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying. Could it relate to the zk version? We are a platform team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9 and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced in 1.11 jobs. That's why we think it is related to version upgrade. 

 

Best

Lu

 

On Thu, Jan 28, 2021 at 7:56 PM Xintong Song <[hidden email]> wrote:

The ZK client side uses 15s connection timeout and 60s session timeout in Flink. There's nothing similar to a heartbeat interval configured, which I assume is up to ZK's internal implementation. These things have not changed in FLink since at least 2017.

 

If both ZK client and server complain about timeout, and there's no gc issue spotted, I would consider a network instability.


Thank you~

Xintong Song

 

 

On Fri, Jan 29, 2021 at 3:15 AM Lu Niu <[hidden email]> wrote:

After checking the log I found the root cause is zk client timeout on TM:

```

2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership.

```

 

I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks!

 

Best

Lu

 

 

On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <[hidden email]> wrote:

I'm not aware of any significant changes to the HA components between 1.9/1.11.

Would you mind sharing the complete jobmanager/taskmanager logs?


Thank you~

Xintong Song

 

 

On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

 

Best

Lu

 

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:

Hi Lu,

 

I assume you are using ZooKeeper as the HA service?

 

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

 

Thank you~

Xintong Song

 

 

On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:

Hi, Flink users

 

Recently we migrated to flink 1.11 and see exceptions like:

```

2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on [hidden email].
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]

```

 

```

2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on [hidden email].
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

 

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

 

Best

Lu

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Lu Niu
Hi, Colletta

Thanks for sharing! Do you mind share one stacktrace for that error as well? Thanks!

Best
Lu

On Sat, Mar 27, 2021 at 5:36 AM Colletta, Edward <[hidden email]> wrote:

 

FYI, we experience a similar error again, lost leadership but not due to timeout but a disconnect from zookeeper.  This time I examined logs for other errors related to zookeeper and found the kafka cluster that uses the same zookeeper also was disconnected. 

 

We run on AWS and this seems to be AWS related. 

 

 

From: Xintong Song <[hidden email]>
Sent: Sunday, January 31, 2021 9:23 PM
To: user <[hidden email]>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

 

This email is from an external source - exercise caution regarding links and attachments.

 

Hi Colletta,

 

This error is kind of expected if the JobMaster / ResourceManager does not maintain a stable connection to the ZooKeeper service, which may be caused by network issues, GC pause, or unstable ZK service availability.

 

By "similar issue", what I meant is I'm not aware of any issue related to the upgrading of the ZK version that may cause the leadership loss.


Thank you~

Xintong Song

 

 

On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward <[hidden email]> wrote:

“but I'm not aware of any similar issue reported since the upgrading”

For the record, we experienced this same error on Flink 1.11.2 this past week.

 

From: Xintong Song <[hidden email]>
Sent: Friday, January 29, 2021 7:34 PM
To: user <[hidden email]>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

 

This email is from an external source - exercise caution regarding links and attachments.

 


Thank you~

Xintong Song

 

 

On Sat, Jan 30, 2021 at 8:27 AM Xintong Song <[hidden email]> wrote:

There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not aware of any similar issue reported since the upgrading.

I would suggest the following:

- Turn on the DEBUG log see if there's any valuable details

- Maybe try asking in the Apache Zookeeper community, see if this is a known issue.


Thank you~
Xintong Song


Thank you~

Xintong Song

 

On Sat, Jan 30, 2021 at 6:47 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying. Could it relate to the zk version? We are a platform team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9 and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced in 1.11 jobs. That's why we think it is related to version upgrade. 

 

Best

Lu

 

On Thu, Jan 28, 2021 at 7:56 PM Xintong Song <[hidden email]> wrote:

The ZK client side uses 15s connection timeout and 60s session timeout in Flink. There's nothing similar to a heartbeat interval configured, which I assume is up to ZK's internal implementation. These things have not changed in FLink since at least 2017.

 

If both ZK client and server complain about timeout, and there's no gc issue spotted, I would consider a network instability.


Thank you~

Xintong Song

 

 

On Fri, Jan 29, 2021 at 3:15 AM Lu Niu <[hidden email]> wrote:

After checking the log I found the root cause is zk client timeout on TM:

```

2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership.

```

 

I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks!

 

Best

Lu

 

 

On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <[hidden email]> wrote:

I'm not aware of any significant changes to the HA components between 1.9/1.11.

Would you mind sharing the complete jobmanager/taskmanager logs?


Thank you~

Xintong Song

 

 

On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

 

Best

Lu

 

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:

Hi Lu,

 

I assume you are using ZooKeeper as the HA service?

 

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

 

Thank you~

Xintong Song

 

 

On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:

Hi, Flink users

 

Recently we migrated to flink 1.11 and see exceptions like:

```

2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on [hidden email].
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]

```

 

```

2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on [hidden email].
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

 

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

 

Best

Lu

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

Lu Niu
FYI, my teammate Chen posted a similar question: ,Apache Flink Mailing List archive. - handle SUSPENDED in ZooKeeperLeaderRetrievalService. That is the root cause of the problem.



On Wed, Mar 31, 2021 at 2:01 PM Lu Niu <[hidden email]> wrote:
Hi, Colletta

Thanks for sharing! Do you mind share one stacktrace for that error as well? Thanks!

Best
Lu

On Sat, Mar 27, 2021 at 5:36 AM Colletta, Edward <[hidden email]> wrote:

 

FYI, we experience a similar error again, lost leadership but not due to timeout but a disconnect from zookeeper.  This time I examined logs for other errors related to zookeeper and found the kafka cluster that uses the same zookeeper also was disconnected. 

 

We run on AWS and this seems to be AWS related. 

 

 

From: Xintong Song <[hidden email]>
Sent: Sunday, January 31, 2021 9:23 PM
To: user <[hidden email]>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

 

This email is from an external source - exercise caution regarding links and attachments.

 

Hi Colletta,

 

This error is kind of expected if the JobMaster / ResourceManager does not maintain a stable connection to the ZooKeeper service, which may be caused by network issues, GC pause, or unstable ZK service availability.

 

By "similar issue", what I meant is I'm not aware of any issue related to the upgrading of the ZK version that may cause the leadership loss.


Thank you~

Xintong Song

 

 

On Sun, Jan 31, 2021 at 4:14 AM Colletta, Edward <[hidden email]> wrote:

“but I'm not aware of any similar issue reported since the upgrading”

For the record, we experienced this same error on Flink 1.11.2 this past week.

 

From: Xintong Song <[hidden email]>
Sent: Friday, January 29, 2021 7:34 PM
To: user <[hidden email]>
Subject: Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

 

This email is from an external source - exercise caution regarding links and attachments.

 


Thank you~

Xintong Song

 

 

On Sat, Jan 30, 2021 at 8:27 AM Xintong Song <[hidden email]> wrote:

There's indeed a ZK version upgrading during 1.9 and 1.11, but I'm not aware of any similar issue reported since the upgrading.

I would suggest the following:

- Turn on the DEBUG log see if there's any valuable details

- Maybe try asking in the Apache Zookeeper community, see if this is a known issue.


Thank you~
Xintong Song


Thank you~

Xintong Song

 

On Sat, Jan 30, 2021 at 6:47 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying. Could it relate to the zk version? We are a platform team at Pinterest in the middle of migrating form 1.9.1 to 1.11. Both 1.9 and 1.11 jobs talking to the same ZK for JM HA. This problem only surfaced in 1.11 jobs. That's why we think it is related to version upgrade. 

 

Best

Lu

 

On Thu, Jan 28, 2021 at 7:56 PM Xintong Song <[hidden email]> wrote:

The ZK client side uses 15s connection timeout and 60s session timeout in Flink. There's nothing similar to a heartbeat interval configured, which I assume is up to ZK's internal implementation. These things have not changed in FLink since at least 2017.

 

If both ZK client and server complain about timeout, and there's no gc issue spotted, I would consider a network instability.


Thank you~

Xintong Song

 

 

On Fri, Jan 29, 2021 at 3:15 AM Lu Niu <[hidden email]> wrote:

After checking the log I found the root cause is zk client timeout on TM:

```

2021-01-25 14:01:49,600 WARN org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f
2021-01-25 14:01:49,610 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 40020ms for sessionid 0x404f9ca531a5d6f, closing socket connection and attempting reconnect
2021-01-25 14:01:49,711 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2021-01-25 14:01:49,711 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 27ac39342913d29baac4cde13062c4a4 with leader id b5af099c17a05fcf15e7bbfcb74e49ea lost leadership.
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 27ac39342913d29baac4cde13062c4a4.
2021-01-25 14:01:49,712 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7).
2021-01-25 14:01:49,712 WARN org.apache.flink.runtime.taskmanager.Task - Sink: USER_EVENTS-spo_derived_event-SINK-SINKS.kafka (156/360) (d5b5887e639874cb70d7fef939b957b7) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 27ac39342913d29baac4cde13062c4a4 lost the leadership.

```

 

I checked that TM gc log, no gc issues. it also shows client timeout in zookeeper server log. How frequently the zk client sync with server side in flink? The log says client doesn't heartbeat to server for 40s. Any help? thanks!

 

Best

Lu

 

 

On Thu, Dec 17, 2020 at 6:10 PM Xintong Song <[hidden email]> wrote:

I'm not aware of any significant changes to the HA components between 1.9/1.11.

Would you mind sharing the complete jobmanager/taskmanager logs?


Thank you~

Xintong Song

 

 

On Fri, Dec 18, 2020 at 8:53 AM Lu Niu <[hidden email]> wrote:

Hi, Xintong

 

Thanks for replying and your suggestion. I did check the ZK side but there is nothing interesting. The error message actually shows that only one TM thought JM lost leadership while others ran fine. Also, this happened only after we migrated from 1.9 to 1.11. Is it possible this is introduced by 1.11? 

 

Best

Lu

 

On Wed, Dec 16, 2020 at 5:56 PM Xintong Song <[hidden email]> wrote:

Hi Lu,

 

I assume you are using ZooKeeper as the HA service?

 

A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into the ZooKeeper logs checking why RM's leadership is revoked.

 

Thank you~

Xintong Song

 

 

On Thu, Dec 17, 2020 at 8:42 AM Lu Niu <[hidden email]> wrote:

Hi, Flink users

 

Recently we migrated to flink 1.11 and see exceptions like:

```

2020-12-15 12:41:01,199 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtuple (21/60) (711d1d319691a4b80e30fe6ab7dfab5b) switched from RUNNING to FAILED on [hidden email].
java.lang.Exception: Job leader for job id 47b1531f79ffe3b86bc5910f6071e40c lost leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:1852) ~[nrtg-1.11_deploy.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_212-ga]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:1851) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]

```

 

```

2020-12-15 01:01:39,531 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - USER_MATERIALIZED_EVENT_SIGNAL.user_context.SINK-stream_joiner -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-SINK-SINKS.realpin (260/360) (0c1f4495088ec9452c597f46a88a2c8e) switched from RUNNING to FAILED on [hidden email].
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ~[nrtg-1.11_deploy.jar:?]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [nrtg-1.11_deploy.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [nrtg-1.11_deploy.jar:?]
at akka.actor.Actor$class.aroundReceive(Actor.scala:539) [nrtg-1.11_deploy.jar:?]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612) [nrtg-1.11_deploy.jar:?]
at akka.actor.ActorCell.invoke(ActorCell.scala:581) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268) [nrtg-1.11_deploy.jar:?]
at akka.dispatch.Mailbox.run(Mailbox.scala:229) [nrtg-1.11_deploy.jar:?]
```

 

This happens a few times per week. It seems like one Task Manager wrongly thought JobMananger is lost and triggers a full restart of the whole job. Does anyone know how to resolve such errors? Thanks!

 

Best

Lu