JobManager in HA with a single node loses leadership

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

JobManager in HA with a single node loses leadership

Julio Biason
Hey guys,

I'm seeing a weird error happening here: We have our JobManager configured in HA mode, but with a single JobManager in the cluster (the second one was in another machine that start showing flaky network, so we removed it). Everything is running in Standalone mode.

Sometimes, the jobs are restarting and the JobManager logs shows this:

org.apache.flink.util.FlinkException: JobManager responsible for bbbae593c175e0c17c32718a56527ab9 lost the leadership.                                                                                 at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1167)    
        at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:137)                                                                                                       at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1608)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)                                                                                                        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)                  
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)                                                                                                             at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)                             
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)                                                                                                                                       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)                                                                                                                      
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)                                                                                                                                    at akka.actor.ActorCell.invoke(ActorCell.scala:495)                                                                                                                   
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)                                                                                                                                     at akka.dispatch.Mailbox.run(Mailbox.scala:224)                                                            
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)                                                                                                                                      
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)                                                                                                              
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)                                                                                                   
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)                                                                                                          
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)                                                                                                  
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id d4ca7942b20bdf87ccf9335f698a5029 timed out.                                                            
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1609)                                             
        ... 15 more                                                                                                  

If there is a single JobManager in the cluster... who is taking the leadership? Is that even possible?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: JobManager in HA with a single node loses leadership

tison
Hi Julio,

If the single JobManager lost temporarily and reconnected later, it could be regranted leadership. And if you use Flink on Yarn, the Yarn RM (according to configuration) would start a new ApplicationMaster to act as a take-over JobManager.

Best,
tison.


Julio Biason <[hidden email]> 于2018年9月28日周五 上午3:56写道:
Hey guys,

I'm seeing a weird error happening here: We have our JobManager configured in HA mode, but with a single JobManager in the cluster (the second one was in another machine that start showing flaky network, so we removed it). Everything is running in Standalone mode.

Sometimes, the jobs are restarting and the JobManager logs shows this:

org.apache.flink.util.FlinkException: JobManager responsible for bbbae593c175e0c17c32718a56527ab9 lost the leadership.                                                                                 at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1167)    
        at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:137)                                                                                                       at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1608)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)                                                                                                        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)                  
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)                                                                                                             at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)                             
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)                                                                                                                                       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)                                                                                                                      
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)                                                                                                                                    at akka.actor.ActorCell.invoke(ActorCell.scala:495)                                                                                                                   
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)                                                                                                                                     at akka.dispatch.Mailbox.run(Mailbox.scala:224)                                                            
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)                                                                                                                                      
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)                                                                                                              
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)                                                                                                   
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)                                                                                                          
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)                                                                                                  
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id d4ca7942b20bdf87ccf9335f698a5029 timed out.                                                            
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1609)                                             
        ... 15 more                                                                                                  

If there is a single JobManager in the cluster... who is taking the leadership? Is that even possible?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: JobManager in HA with a single node loses leadership

vino yang
Hi Julio,

Which version of Flink are you using? If it is 1.5+, then you can try to increase the heartbeat timeout by configuring it[1]. 
In addition, the possible cause is that the load of tm is too heavy, for example, because the Full GC causes JVM stalls, 
or deadlocks and other issues may cause heartbeat timeout. Please closely monitor the relevant indicators of tm.

In addition, it is better to have two or more JM instances in Standalone HA mode.

Thanks, vino.


Tzu-Li Chen <[hidden email]> 于2018年9月28日周五 上午8:56写道:
Hi Julio,

If the single JobManager lost temporarily and reconnected later, it could be regranted leadership. And if you use Flink on Yarn, the Yarn RM (according to configuration) would start a new ApplicationMaster to act as a take-over JobManager.

Best,
tison.


Julio Biason <[hidden email]> 于2018年9月28日周五 上午3:56写道:
Hey guys,

I'm seeing a weird error happening here: We have our JobManager configured in HA mode, but with a single JobManager in the cluster (the second one was in another machine that start showing flaky network, so we removed it). Everything is running in Standalone mode.

Sometimes, the jobs are restarting and the JobManager logs shows this:

org.apache.flink.util.FlinkException: JobManager responsible for bbbae593c175e0c17c32718a56527ab9 lost the leadership.                                                                                 at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1167)    
        at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:137)                                                                                                       at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1608)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)                                                                                                        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)                  
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)                                                                                                             at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)                             
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)                                                                                                                                       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)                                                                                                                      
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)                                                                                                                                    at akka.actor.ActorCell.invoke(ActorCell.scala:495)                                                                                                                   
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)                                                                                                                                     at akka.dispatch.Mailbox.run(Mailbox.scala:224)                                                            
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)                                                                                                                                      
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)                                                                                                              
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)                                                                                                   
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)                                                                                                          
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)                                                                                                  
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id d4ca7942b20bdf87ccf9335f698a5029 timed out.                                                            
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1609)                                             
        ... 15 more                                                                                                  

If there is a single JobManager in the cluster... who is taking the leadership? Is that even possible?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554
Reply | Threaded
Open this post in threaded view
|

Re: JobManager in HA with a single node loses leadership

Zhu Zhu
Hi Julio,

If you are using a HA mode depending on other services like ZooKeeper, you can also check whether that service is OK when the JM lost leadership.
In our experience, network partitioning of JM to ZK and ZK object exceeding max size limit can also lead to JM leadership lost.

vino yang <[hidden email]> 于2018年9月28日周五 上午9:24写道:
Hi Julio,

Which version of Flink are you using? If it is 1.5+, then you can try to increase the heartbeat timeout by configuring it[1]. 
In addition, the possible cause is that the load of tm is too heavy, for example, because the Full GC causes JVM stalls, 
or deadlocks and other issues may cause heartbeat timeout. Please closely monitor the relevant indicators of tm.

In addition, it is better to have two or more JM instances in Standalone HA mode.

Thanks, vino.


Tzu-Li Chen <[hidden email]> 于2018年9月28日周五 上午8:56写道:
Hi Julio,

If the single JobManager lost temporarily and reconnected later, it could be regranted leadership. And if you use Flink on Yarn, the Yarn RM (according to configuration) would start a new ApplicationMaster to act as a take-over JobManager.

Best,
tison.


Julio Biason <[hidden email]> 于2018年9月28日周五 上午3:56写道:
Hey guys,

I'm seeing a weird error happening here: We have our JobManager configured in HA mode, but with a single JobManager in the cluster (the second one was in another machine that start showing flaky network, so we removed it). Everything is running in Standalone mode.

Sometimes, the jobs are restarting and the JobManager logs shows this:

org.apache.flink.util.FlinkException: JobManager responsible for bbbae593c175e0c17c32718a56527ab9 lost the leadership.                                                                                 at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1167)    
        at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:137)                                                                                                       at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1608)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)                                                                                                        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)                  
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)                                                                                                             at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)                             
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)                                                                                                                                       at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)                                                                                                                      
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)                                                                                                                                    at akka.actor.ActorCell.invoke(ActorCell.scala:495)                                                                                                                   
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)                                                                                                                                     at akka.dispatch.Mailbox.run(Mailbox.scala:224)                                                            
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)                                                                                                                                      
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)                                                                                                              
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)                                                                                                   
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)                                                                                                          
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)                                                                                                  
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id d4ca7942b20bdf87ccf9335f698a5029 timed out.                                                            
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobManagerHeartbeatListener.lambda$notifyHeartbeatTimeout$0(TaskExecutor.java:1609)                                             
        ... 15 more                                                                                                  

If there is a single JobManager in the cluster... who is taking the leadership? Is that even possible?

--
Julio Biason, Sofware Engineer
AZION  |  Deliver. Accelerate. Protect.
Office: <a href="callto:+555130838101" value="+555130838101" style="color:rgb(17,85,204);font-family:arial,sans-serif;font-size:12.8px" target="_blank">+55 51 3083 8101  |  Mobile: <a href="callto:+5551996209291" style="color:rgb(17,85,204)" target="_blank">+55 51 99907 0554