Fwd: Heartbeat Timeout

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Heartbeat Timeout

Jan Brusch
Hi Robert,

do you have some additional info? For example the last log message of the unreachable TaskManagers. Is the Job running in kubernetes? What backend are you using?

From the first looks of it, I have seen this behaviour mostly in cases where one or more taskmanagers shut down due to GarbageCollection issues or OutOfMemory-Errors.


Best regards

Jan

On 27.05.21 16:44, Robert Cullen wrote:

I have a job that fails after @1 hour due to a TaskManager Timeout. How can I prevent this from happening?

2021-05-27 10:24:21
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
    at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
    at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 10.42.0.49:6122-e26293 timed out.
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
    ... 27 more
--
Robert Cullen
240-475-4490
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
Reply | Threaded
Open this post in threaded view
|

Re: Heartbeat Timeout

Robert Cullen

Hello Jan,

My flink cluster is running on a kubernetes single node (rke). I have the JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The TaskManger reaches the max JVM Heap Size after about one hour then fails. Here is a snippet from the TaskManager log:

2021-05-27 15:36:36,040 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2021-05-27 15:36:36,041 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot request 85433366f8bf1c5efd3b88f634676764 for job c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 00000000000000000000000000000000.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated slot for 85433366f8bf1c5efd3b88f634676764.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 00000000-0000-0000-0000-000000000000.
2021-05-27 15:36:36,044 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2021-05-27 15:36:36,045 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55.

I guess the simple resolution is to increase the JVM Heap Size?


On Thu, May 27, 2021 at 10:51 AM Jan Brusch <[hidden email]> wrote:
Hi Robert,

do you have some additional info? For example the last log message of the unreachable TaskManagers. Is the Job running in kubernetes? What backend are you using?

From the first looks of it, I have seen this behaviour mostly in cases where one or more taskmanagers shut down due to GarbageCollection issues or OutOfMemory-Errors.


Best regards

Jan

On 27.05.21 16:44, Robert Cullen wrote:

I have a job that fails after @1 hour due to a TaskManager Timeout. How can I prevent this from happening?

2021-05-27 10:24:21
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
    at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
    at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 10.42.0.49:6122-e26293 timed out.
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
    ... 27 more
--
Robert Cullen
240-475-4490
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


--
Robert Cullen
240-475-4490
Reply | Threaded
Open this post in threaded view
|

Re: Heartbeat Timeout

Jan Brusch

Hi Robert,

that sounds like a case of either your application state ultimately being bigger than the available RAM or a memory leak in your application (e.g., some states are not properly cleaned out after they are not needed anymore).

If you have the available resources you could try and increase the TaskManager RAM size by a large amount and see where RAM usage tops out. If it ever does... in case of a memory leak it would grow indefinitely. Then you could reason about how to fix the memory leak or how to achieve your goal with a smaller application state.

A remedy for application states larger than your available RAM is to use the RocksDB State backend, which allows for states larger than your application RAM. But that requires your kubernetes nodes to be equipped with a fast hard drive (SSD, optimally).

That's how I would approach your problem...


Hope that helps

Jan

On 27.05.21 17:51, Robert Cullen wrote:

Hello Jan,

My flink cluster is running on a kubernetes single node (rke). I have the JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The TaskManger reaches the max JVM Heap Size after about one hour then fails. Here is a snippet from the TaskManager log:

2021-05-27 15:36:36,040 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2021-05-27 15:36:36,041 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot request 85433366f8bf1c5efd3b88f634676764 for job c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 00000000000000000000000000000000.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated slot for 85433366f8bf1c5efd3b88f634676764.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 00000000-0000-0000-0000-000000000000.
2021-05-27 15:36:36,044 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2021-05-27 15:36:36,045 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55.

I guess the simple resolution is to increase the JVM Heap Size?


On Thu, May 27, 2021 at 10:51 AM Jan Brusch <[hidden email]> wrote:
Hi Robert,

do you have some additional info? For example the last log message of the unreachable TaskManagers. Is the Job running in kubernetes? What backend are you using?

From the first looks of it, I have seen this behaviour mostly in cases where one or more taskmanagers shut down due to GarbageCollection issues or OutOfMemory-Errors.


Best regards

Jan

On 27.05.21 16:44, Robert Cullen wrote:

I have a job that fails after @1 hour due to a TaskManager Timeout. How can I prevent this from happening?

2021-05-27 10:24:21
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
    at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
    at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 10.42.0.49:6122-e26293 timed out.
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
    ... 27 more
--
Robert Cullen
240-475-4490
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


--
Robert Cullen
240-475-4490
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
Reply | Threaded
Open this post in threaded view
|

Re: Heartbeat Timeout

Matthias
Hi Robert,
increasing heap memory usage could be due to some memory leak in the user code. Have you analyzed a heap dump? About the TM logs you shared. I don't see anything suspicious there. Nothing about memory problems. Are those the correct logs?

Best,
Matthias

On Thu, May 27, 2021 at 6:01 PM Jan Brusch <[hidden email]> wrote:

Hi Robert,

that sounds like a case of either your application state ultimately being bigger than the available RAM or a memory leak in your application (e.g., some states are not properly cleaned out after they are not needed anymore).

If you have the available resources you could try and increase the TaskManager RAM size by a large amount and see where RAM usage tops out. If it ever does... in case of a memory leak it would grow indefinitely. Then you could reason about how to fix the memory leak or how to achieve your goal with a smaller application state.

A remedy for application states larger than your available RAM is to use the RocksDB State backend, which allows for states larger than your application RAM. But that requires your kubernetes nodes to be equipped with a fast hard drive (SSD, optimally).

That's how I would approach your problem...


Hope that helps

Jan

On 27.05.21 17:51, Robert Cullen wrote:

Hello Jan,

My flink cluster is running on a kubernetes single node (rke). I have the JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The TaskManger reaches the max JVM Heap Size after about one hour then fails. Here is a snippet from the TaskManager log:

2021-05-27 15:36:36,040 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2021-05-27 15:36:36,041 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot request 85433366f8bf1c5efd3b88f634676764 for job c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 00000000000000000000000000000000.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated slot for 85433366f8bf1c5efd3b88f634676764.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 00000000-0000-0000-0000-000000000000.
2021-05-27 15:36:36,044 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2021-05-27 15:36:36,045 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55.

I guess the simple resolution is to increase the JVM Heap Size?


On Thu, May 27, 2021 at 10:51 AM Jan Brusch <[hidden email]> wrote:
Hi Robert,

do you have some additional info? For example the last log message of the unreachable TaskManagers. Is the Job running in kubernetes? What backend are you using?

From the first looks of it, I have seen this behaviour mostly in cases where one or more taskmanagers shut down due to GarbageCollection issues or OutOfMemory-Errors.


Best regards

Jan

On 27.05.21 16:44, Robert Cullen wrote:

I have a job that fails after @1 hour due to a TaskManager Timeout. How can I prevent this from happening?

2021-05-27 10:24:21
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
    at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
    at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 10.42.0.49:6122-e26293 timed out.
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
    ... 27 more
--
Robert Cullen
240-475-4490
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


--
Robert Cullen
240-475-4490
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501
Reply | Threaded
Open this post in threaded view
|

Re: Heartbeat Timeout

Robert Cullen
Matthias,  I increased the JVM Heap size as Jan suggested and it appears to be a memory leak in the user code (although I'm not sure why since this is a simple job that uses a loop to simulate data being written to an S3 data store).  Yes, the logs show no apparent problem but the timestamp corresponds to the job failure.  Forgive me but I don't know how to analyze a heap dump.

On Fri, May 28, 2021 at 8:27 AM Matthias Pohl <[hidden email]> wrote:
Hi Robert,
increasing heap memory usage could be due to some memory leak in the user code. Have you analyzed a heap dump? About the TM logs you shared. I don't see anything suspicious there. Nothing about memory problems. Are those the correct logs?

Best,
Matthias

On Thu, May 27, 2021 at 6:01 PM Jan Brusch <[hidden email]> wrote:

Hi Robert,

that sounds like a case of either your application state ultimately being bigger than the available RAM or a memory leak in your application (e.g., some states are not properly cleaned out after they are not needed anymore).

If you have the available resources you could try and increase the TaskManager RAM size by a large amount and see where RAM usage tops out. If it ever does... in case of a memory leak it would grow indefinitely. Then you could reason about how to fix the memory leak or how to achieve your goal with a smaller application state.

A remedy for application states larger than your available RAM is to use the RocksDB State backend, which allows for states larger than your application RAM. But that requires your kubernetes nodes to be equipped with a fast hard drive (SSD, optimally).

That's how I would approach your problem...


Hope that helps

Jan

On 27.05.21 17:51, Robert Cullen wrote:

Hello Jan,

My flink cluster is running on a kubernetes single node (rke). I have the JVM Heap Size set at 2.08 GB and the Managed Memory at 2.93 GB. The TaskManger reaches the max JVM Heap Size after about one hour then fails. Here is a snippet from the TaskManager log:

2021-05-27 15:36:36,040 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2021-05-27 15:36:36,041 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer reserved slots to the leader of job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:2, state:ALLOCATED, resource profile: ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=500.000mb (524288000 bytes), taskOffHeapMemory=0 bytes, managedMemory=750.000mb (786432000 bytes), networkMemory=146.000mb (153092098 bytes)}, allocationId: 2f2e7abd16f21e156cab15cfa0d1d090, jobId: c5ff9686e944f62a24c10c6bf20a5a55).
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job c5ff9686e944f62a24c10c6bf20a5a55 from job leader monitoring.
2021-05-27 15:36:36,042 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job c5ff9686e944f62a24c10c6bf20a5a55.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot request 85433366f8bf1c5efd3b88f634676764 for job c5ff9686e944f62a24c10c6bf20a5a55 from resource manager with leader id 00000000000000000000000000000000.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated slot for 85433366f8bf1c5efd3b88f634676764.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job c5ff9686e944f62a24c10c6bf20a5a55 for job leader monitoring.
2021-05-27 15:36:36,043 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 with leader id 00000000-0000-0000-0000-000000000000.
2021-05-27 15:36:36,044 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2021-05-27 15:36:36,045 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_5 for job c5ff9686e944f62a24c10c6bf20a5a55.

I guess the simple resolution is to increase the JVM Heap Size?


On Thu, May 27, 2021 at 10:51 AM Jan Brusch <[hidden email]> wrote:
Hi Robert,

do you have some additional info? For example the last log message of the unreachable TaskManagers. Is the Job running in kubernetes? What backend are you using?

From the first looks of it, I have seen this behaviour mostly in cases where one or more taskmanagers shut down due to GarbageCollection issues or OutOfMemory-Errors.


Best regards

Jan

On 27.05.21 16:44, Robert Cullen wrote:

I have a job that fails after @1 hour due to a TaskManager Timeout. How can I prevent this from happening?

2021-05-27 10:24:21
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
    at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
    at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1462)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1139)
    at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1079)
    at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:783)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
    at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
    at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
    at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
    at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:683)
    at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:2010)
    at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271)
    at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
    at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:361)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:249)
    at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:230)
    at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:497)
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1295)
    at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:111)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 10.42.0.49:6122-e26293 timed out.
    at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1299)
    ... 27 more
--
Robert Cullen
240-475-4490
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


--
Robert Cullen
240-475-4490
-- 
neuland  – Büro für Informatik GmbH
Konsul-Smidt-Str. 8g, 28217 Bremen

Telefon (0421) 380107 57
Fax (0421) 380107 99
https://www.neuland-bfi.de

https://twitter.com/neuland
https://facebook.com/neulandbfi
https://xing.com/company/neulandbfi


Geschäftsführer: Thomas Gebauer, Jan Zander
Registergericht: Amtsgericht Bremen, HRB 23395 HB
USt-ID. DE 246585501


--
Robert Cullen
240-475-4490