Error about "Rejected TaskExecutor registration at the ResourceManger"

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Error about "Rejected TaskExecutor registration at the ResourceManger"

Kai Fu
Hi team,

We encountered an issue during recovery from checkpoint. It's recovering because the downstream Kafka sink is full for a while and the job is failed and keeps trying to recover(The downstream is full for about 4 hours). The job cannot recover from checkpoint successfully even if after we scaled up the Kafka cluster and shows the following exception. Is there any guidance on how to locate and avoid this kind of issue?

2021-05-30 01:31:21,419 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).
2021-05-30 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved ResourceManager address, beginning registration
2021-05-30 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error occurred in TaskExecutor akka.tcp://[hidden email]:36631/user/rpc/taskmanager_0.
org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_* has been rejected: Rejected TaskExecutor registration at the ResourceManger because: The ResourceManager does not recognize this TaskExecutor.
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_272]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]
2021-05-30 01:31:21,428 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...

--
Best wishes,
- Kai
Reply | Threaded
Open this post in threaded view
|

Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

Till Rohrmann
Hi Kai,

The rejection you are seeing should not be serious. The way this can happen is the following: If Yarn restarts the application master, Flink will try to recover previously started containers. If this is not possible or Yarn only tells about a subset of the previously allocated containers, then it can happen that if a container that has not been reported to the new ResourceManager tries to register is rejected because it is not known. The idea behind this behaviour is to only accept those resources which one has knowingly requested in order to free other resources which might belong to another Yarn application.

In any case, the newly started Flink ResourceManager should request new containers so that there are enough TaskManagers available to run your job (assuming that the Yarn cluster has enough resources). Hence, the cluster should recover from this situation and there should not be a lot to worry about.

Cheers,
Till

On Sun, May 30, 2021 at 7:36 AM Kai Fu <[hidden email]> wrote:
Hi team,

We encountered an issue during recovery from checkpoint. It's recovering because the downstream Kafka sink is full for a while and the job is failed and keeps trying to recover(The downstream is full for about 4 hours). The job cannot recover from checkpoint successfully even if after we scaled up the Kafka cluster and shows the following exception. Is there any guidance on how to locate and avoid this kind of issue?

2021-05-30 01:31:21,419 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).
2021-05-30 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved ResourceManager address, beginning registration
2021-05-30 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error occurred in TaskExecutor akka.tcp://[hidden email]:36631/user/rpc/taskmanager_0.
org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_* has been rejected: Rejected TaskExecutor registration at the ResourceManger because: The ResourceManager does not recognize this TaskExecutor.
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_272]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]
2021-05-30 01:31:21,428 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...

--
Best wishes,
- Kai
Reply | Threaded
Open this post in threaded view
|

Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

Kai Fu
HI Till, 

Thank you for your response, per my observation that the process lasted for ~1 day, and cannot be recovered and we killed the cluster finally.

On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Kai,

The rejection you are seeing should not be serious. The way this can happen is the following: If Yarn restarts the application master, Flink will try to recover previously started containers. If this is not possible or Yarn only tells about a subset of the previously allocated containers, then it can happen that if a container that has not been reported to the new ResourceManager tries to register is rejected because it is not known. The idea behind this behaviour is to only accept those resources which one has knowingly requested in order to free other resources which might belong to another Yarn application.

In any case, the newly started Flink ResourceManager should request new containers so that there are enough TaskManagers available to run your job (assuming that the Yarn cluster has enough resources). Hence, the cluster should recover from this situation and there should not be a lot to worry about.

Cheers,
Till

On Sun, May 30, 2021 at 7:36 AM Kai Fu <[hidden email]> wrote:
Hi team,

We encountered an issue during recovery from checkpoint. It's recovering because the downstream Kafka sink is full for a while and the job is failed and keeps trying to recover(The downstream is full for about 4 hours). The job cannot recover from checkpoint successfully even if after we scaled up the Kafka cluster and shows the following exception. Is there any guidance on how to locate and avoid this kind of issue?

2021-05-30 01:31:21,419 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).
2021-05-30 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved ResourceManager address, beginning registration
2021-05-30 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error occurred in TaskExecutor akka.tcp://[hidden email]:36631/user/rpc/taskmanager_0.
org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_* has been rejected: Rejected TaskExecutor registration at the ResourceManger because: The ResourceManager does not recognize this TaskExecutor.
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_272]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]
2021-05-30 01:31:21,428 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...

--
Best wishes,
- Kai


--
Best wishes,
- Kai
Reply | Threaded
Open this post in threaded view
|

Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

JING ZHANG
Hi Kai,
The reason why job job cannot be recovered maybe not directly related to the exception you mentioned in your email.
Would you like provide complete jobmanager.log and taskmanager.log. Maybe we could find some hints there. 

Best regards,
JING ZHANG

Kai Fu <[hidden email]> 于2021年6月2日周三 上午7:23写道:
HI Till, 

Thank you for your response, per my observation that the process lasted for ~1 day, and cannot be recovered and we killed the cluster finally.

On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Kai,

The rejection you are seeing should not be serious. The way this can happen is the following: If Yarn restarts the application master, Flink will try to recover previously started containers. If this is not possible or Yarn only tells about a subset of the previously allocated containers, then it can happen that if a container that has not been reported to the new ResourceManager tries to register is rejected because it is not known. The idea behind this behaviour is to only accept those resources which one has knowingly requested in order to free other resources which might belong to another Yarn application.

In any case, the newly started Flink ResourceManager should request new containers so that there are enough TaskManagers available to run your job (assuming that the Yarn cluster has enough resources). Hence, the cluster should recover from this situation and there should not be a lot to worry about.

Cheers,
Till

On Sun, May 30, 2021 at 7:36 AM Kai Fu <[hidden email]> wrote:
Hi team,

We encountered an issue during recovery from checkpoint. It's recovering because the downstream Kafka sink is full for a while and the job is failed and keeps trying to recover(The downstream is full for about 4 hours). The job cannot recover from checkpoint successfully even if after we scaled up the Kafka cluster and shows the following exception. Is there any guidance on how to locate and avoid this kind of issue?

2021-05-30 01:31:21,419 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).
2021-05-30 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved ResourceManager address, beginning registration
2021-05-30 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error occurred in TaskExecutor akka.tcp://[hidden email]:36631/user/rpc/taskmanager_0.
org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_* has been rejected: Rejected TaskExecutor registration at the ResourceManger because: The ResourceManager does not recognize this TaskExecutor.
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_272]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]
2021-05-30 01:31:21,428 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...

--
Best wishes,
- Kai


--
Best wishes,
- Kai
Reply | Threaded
Open this post in threaded view
|

Re: Error about "Rejected TaskExecutor registration at the ResourceManger"

Kai Fu
Hi Jing,

Thank you for your reply, that cluster is terminated and will provide the log if it occurs again.

On Wed, Jun 2, 2021 at 11:17 AM JING ZHANG <[hidden email]> wrote:
Hi Kai,
The reason why job job cannot be recovered maybe not directly related to the exception you mentioned in your email.
Would you like provide complete jobmanager.log and taskmanager.log. Maybe we could find some hints there. 

Best regards,
JING ZHANG

Kai Fu <[hidden email]> 于2021年6月2日周三 上午7:23写道:
HI Till, 

Thank you for your response, per my observation that the process lasted for ~1 day, and cannot be recovered and we killed the cluster finally.

On Tue, Jun 1, 2021 at 9:47 PM Till Rohrmann <[hidden email]> wrote:
Hi Kai,

The rejection you are seeing should not be serious. The way this can happen is the following: If Yarn restarts the application master, Flink will try to recover previously started containers. If this is not possible or Yarn only tells about a subset of the previously allocated containers, then it can happen that if a container that has not been reported to the new ResourceManager tries to register is rejected because it is not known. The idea behind this behaviour is to only accept those resources which one has knowingly requested in order to free other resources which might belong to another Yarn application.

In any case, the newly started Flink ResourceManager should request new containers so that there are enough TaskManagers available to run your job (assuming that the Yarn cluster has enough resources). Hence, the cluster should recover from this situation and there should not be a lot to worry about.

Cheers,
Till

On Sun, May 30, 2021 at 7:36 AM Kai Fu <[hidden email]> wrote:
Hi team,

We encountered an issue during recovery from checkpoint. It's recovering because the downstream Kafka sink is full for a while and the job is failed and keeps trying to recover(The downstream is full for about 4 hours). The job cannot recover from checkpoint successfully even if after we scaled up the Kafka cluster and shows the following exception. Is there any guidance on how to locate and avoid this kind of issue?

2021-05-30 01:31:21,419 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_*(00000000000000000000000000000000).
2021-05-30 01:31:21,422 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved ResourceManager address, beginning registration
2021-05-30 01:31:21,427 ERROR org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Fatal error occurred in TaskExecutor akka.tcp://[hidden email]:36631/user/rpc/taskmanager_0.
org.apache.flink.util.FlinkException: The TaskExecutor's registration at the ResourceManager akka.tcp://[hidden email]:36833/user/rpc/resourcemanager_* has been rejected: Rejected TaskExecutor registration at the ResourceManger because: The ResourceManager does not recognize this TaskExecutor.
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2254) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutor$ResourceManagerRegistrationListener.onRegistrationRejection(TaskExecutor.java:2209) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:109) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.taskexecutor.TaskExecutorToResourceManagerConnection.onRegistrationRejection(TaskExecutorToResourceManagerConnection.java:40) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.registration.RegisteredRpcConnection.lambda$createNewRegistration$0(RegisteredRpcConnection.java:269) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_272]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456) ~[?:1.8.0_272]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) ~[flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.13.1.jar:1.13.1]
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.13.1.jar:1.13.1]
2021-05-30 01:31:21,428 ERROR org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Fatal error occurred while executing the TaskManager. Shutting it down...

--
Best wishes,
- Kai


--
Best wishes,
- Kai


--
Best wishes,
- Kai