Flink job restart when one ZK node is down

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink job restart when one ZK node is down

Lu Niu
HI, Flink Users

We use a Zk cluster of 5 node for JM HA. When we terminate one node for maintenance, we notice lots of flink job fully restarts. The error looks like:
```
org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
    at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
    at akka.actor.ActorCell.invoke(ActorCell.scala:581)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
    at akka.dispatch.Mailbox.run(Mailbox.scala:229)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```
Is this expected? If not, How to avoid such restarts? Thanks!

Best
Lu Pinterest, Inc
Reply | Threaded
Open this post in threaded view
|

Re: Flink job restart when one ZK node is down

yidan zhao
Yes it is expected, I have also met such problems.

Lu Niu <[hidden email]> 于2021年6月15日周二 上午4:53写道:

>
> HI, Flink Users
>
> We use a Zk cluster of 5 node for JM HA. When we terminate one node for maintenance, we notice lots of flink job fully restarts. The error looks like:
> ```
> org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:581)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:229)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
> Is this expected? If not, How to avoid such restarts? Thanks!
>
> Best
> Lu Pinterest, Inc
Reply | Threaded
Open this post in threaded view
|

Re: Flink job restart when one ZK node is down

Yang Wang
It is a known issue. And please refer to FLINK-10052[1] for more information.


Best,
Yang

yidan zhao <[hidden email]> 于2021年6月15日周二 下午3:43写道:
Yes it is expected, I have also met such problems.

Lu Niu <[hidden email]> 于2021年6月15日周二 上午4:53写道:
>
> HI, Flink Users
>
> We use a Zk cluster of 5 node for JM HA. When we terminate one node for maintenance, we notice lots of flink job fully restarts. The error looks like:
> ```
> org.apache.flink.util.FlinkException: ResourceManager leader changed to new address null
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor.notifyOfNewResourceManagerLeader(TaskExecutor.java:1093)
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$800(TaskExecutor.java:173)
>     at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerLeaderListener.lambda$notifyLeaderAddress$0(TaskExecutor.java:1816)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>     at akka.actor.Actor$class.aroundReceive(Actor.scala:539)
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:227)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:581)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:229)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ```
> Is this expected? If not, How to avoid such restarts? Thanks!
>
> Best
> Lu Pinterest, Inc