Hi,Good that we are more or less on track with this problem :) But the problem here is not that heap size is too small, bot that your kernel is running out of memory and starts killing processes. Either:1. some other process is using the available memory2. Increase memory allocation on your machine/virtual machine/container/cgroup3. Decrease the heap size of Flink’s JVM or non heap size (decrease network memory buffer pool). Of course for any given job/state size/configuration/cluster size there is some minimal reasonable memory size that you have to assign to Flink, otherwise you will have poor performance and/or constant garbage collections and/or you will start getting OOM errors from JVM (don’t confuse those with OS/kernel's OOM errors - those two are on a different level).PiotrekOn 14 Aug 2018, at 07:36, Shailesh Jain <[hidden email]> wrote:Hi Piotrek,Thanks for your reply. I checked through the syslogs for that time, and I see this:Aug 8 13:20:52 smoketest kernel: [1786160.856662] Out of memory: Kill process 2305 (java) score 468 or sacrifice child
Aug 8 13:20:52 smoketest kernel: [1786160.859091] Killed process 2305 (java) total-vm:6120624kB, anon-rss:3661216kB, file-rss:16676kBAs you pointed out, kernel killed the task manager process.If I had already set the max heap size for the JVM (to 3GB in this case), and the memory usage stats showed 2329MB being used 90 seconds earlier, it seems a bit unlikely for operators to consume 700 MB heap space in that short time, because our events ingestion rate is not that high (close to 10 events per minute).2018-08-08 13:19:23,341 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)] Is it possible to log individual operator's memory consumption? This would help in narrowing down on the root cause. There were around 50 operators running (~8 kafka source/sink, ~8 Window operators, and the rest CEP operators).Thanks,ShaileshOn Fri, Aug 10, 2018 at 4:48 PM, Piotr Nowojski <[hidden email]> wrote:Hi,Please post full TaskManager logs, including stderr and stdout. (Have you checked the stderr/stdout for some messages?)I could think of couple reasons:1. process segfault2. process killed by OS3. OS failure1. Should be visible by some message in stderr/stdout file and can be caused by for example JVM, RocksDB or some other native library/code bug.2. Is your system maybe running out of memory? Kernel might kill process if that’s happening. You can also check system (linux?) logs for errors that correlate in time. Where are those logs depend on your OS.3. This might be tricky, but I have seen kernel failures that prevented any messages from being logged for example. Besides this TaskManager failure is your machine operating normally without any other problems/crashes/restarts?PiotrekOn 10 Aug 2018, at 06:59, Shailesh Jain <[hidden email]> wrote:Hi,I hit a similar issue yesterday, the task manager died suspiciously, no error logs in the task manager logs, but I see the following exceptions in the job manager logs:2018-08-05 18:03:28,322 ERROR akka.remote.Remoting- Association to [akka.tcp://flink@localhost:34 483 ] with UID [328996232] irrecoverably failed. Quarantining address.
java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours)
at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1. applyOrElse(Endpoint.scala: 375)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.remote.ReliableDeliverySupervisor.aroundReceive( Endpoint.scala:203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask. java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo l.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW orkerThread.java:107) but almost 3 days later it hit this:2018-08-08 13:22:00,061 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Internal state machine job (1057c13d169dae609466210174e2c c8b) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed: 5ee5de1112776c404541743b63ae0fe0 @ localhost (dataPort=44997)
at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot( SimpleSlot.java:217)
at org.apache.flink.runtime.instance.SlotSharingGroupAssignment .releaseSharedSlot(SlotSharing GroupAssignment.java:523)
at org.apache.flink.runtime.instance.SharedSlot.releaseSlot( SharedSlot.java:192)
at org.apache.flink.runtime.instance.Instance.markDead(Instance .java:167)
at org.apache.flink.runtime.instance.InstanceManager.unregister TaskManager(InstanceManager. java:212)
at org.apache.flink.runtime.jobmanager.JobManager.org $apache$flink$runtime$jobmanager$ JobManager$$handleTaskManagerT erminated(JobManager.scala: 1198)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ handleMessage$1.applyOrElse( JobManager.scala:1096)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF unction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun $receive$1.applyOrElse(LeaderS essionMessageFilter.scala:49)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF unction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag es.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag es.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scal a:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse( LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive (JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.dungeon.DeathWatch$class.receivedTerminated(Death Watch.scala:46)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask. java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo l.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW orkerThread.java:107) followed by:2018-08-08 13:22:20,090 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Internal state machine job (1057c13d169dae609466210174e2c c8b) switched from state RUNNING to FAILING.
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvai lableException: Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. Task to schedule: < Attempt #2 (Source: Custom Source -> Filter (1/1)) @ (unassigned) - [SCHEDULED] > with groupID < fbd084243e87c3fdf3c709a0f2eecf d7 > in sharing group < SlotSharingGroup [fa00013ef15454ea93d21e8c346e0 dd4, fbd084243e87c3fdf3c709a0f2eecf d7, 8f5517c035f67da702f459ef5f3b84 9f] >. Resources available to scheduler: Number of instances=0, total number of slots=0, available slots=0
at org.apache.flink.runtime.jobmanager.scheduler.Scheduler. scheduleTask(Scheduler.java: 263)
at org.apache.flink.runtime.jobmanager.scheduler.Scheduler. allocateSlot(Scheduler.java: 142)
at org.apache.flink.runtime.executiongraph.Execution.lambda$all ocateAndAssignSlotForExecution $1(Execution.java:440)
at java.util.concurrent.CompletableFuture.uniComposeStage(Compl etableFuture.java:981)
at java.util.concurrent.CompletableFuture.thenCompose(Completab leFuture.java:2124)
at org.apache.flink.runtime.executiongraph.Execution.allocateAn dAssignSlotForExecution( Execution.java:438)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.a llocateResourcesForAll(Executi onJobVertex.java:503)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.sched uleEager(ExecutionGraph.java: 900)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.sched uleForExecution(ExecutionGraph .java:854)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.resta rt(ExecutionGraph.java:1175)
at org.apache.flink.runtime.executiongraph.restart.ExecutionGra phRestartCallback.triggerFullR ecovery(ExecutionGraphRestartC allback.java:59)
at org.apache.flink.runtime.executiongraph.restart.FixedDelayRe startStrategy$1.run( FixedDelayRestartStrategy.java :68)
at java.util.concurrent.Executors$RunnableAdapter.call( Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu tureTask.access$201(ScheduledT hreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu tureTask.run(ScheduledThreadPo olExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool Executor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo lExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) There are no error logs in task manager, and following is the last memory consumption log by task manager:2018-08-08 13:19:23,341 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 2329/3072/3072 MB, NON HEAP: 154/197/-1 MB (used/committed/max)]
2018-08-08 13:19:23,341 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 115, Total Capacity: 38101792, Used Memory: 38101793
2018-08-08 13:19:23,341 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 52/55/240 MB (used/committed/max)], [Metaspace: 90/125/-1 MB (used/committed/max)], [Compressed Class Space: 11/17/1024 MB (used/committed/max)]
2018-08-08 13:19:23,341 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 300736, GC COUNT: 6574], [G1 Old Generation, GC TIME (ms): 152, GC COUNT: 2] So I think it rules out OOM as a cause for this crash.Any ideas/leads to debug this would be really helpful. The cluster is running on version 1.4.2.Thanks,ShaileshOn Mon, Mar 26, 2018 at 4:18 PM, Alexander Smirnov <[hidden email]> wrote:Hi Piotr,I didn't find anything special in the logs before the failure.Here are the logs, please take a look:The configuration is:3 task managers:qafdsflinkw011.sclqafdsflinkw012.sclqafdsflinkw013.scl - lost connection3 job managers:qafdsflinkm011.scl - the leaderqafdsflinkm012.sclqafdsflinkm013.scl3 zookeepers:qafdsflinkzk011.sclqafdsflinkzk012.sclqafdsflinkzk013.sclThank you,AlexOn Wed, Mar 21, 2018 at 6:23 PM Piotr Nowojski <[hidden email]> wrote:Hi,Does the issue really happen after 48 hours?Is there some indication of a failure in TaskManager log?If you will be still unable to solve the problem, please provide full TaskManager and JobManager logs.PiotrekOn 21 Mar 2018, at 16:00, Alexander Smirnov <[hidden email]> wrote:One more question - I see a lot of line like the following in the logs[2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 ] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)[2018-03-21 00:34:15,208] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:41068 ] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)[2018-03-21 00:34:15,235] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40677 ] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)[2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:40382 ] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)[2018-03-21 00:34:15,256] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:44744 ] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)[2018-03-21 00:34:15,266] WARN Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 ] with unknown UID is irrecoverably failed. Address cannot be quarantined without knowing the UID, gating instead for 5000 ms. (akka.remote.Remoting)The host is available, but I don't understand where port number comes from. Task Manager uses another port (which is printed in logs on startup)Could you please help to understand why it happens?Thank you,AlexOn Wed, Mar 21, 2018 at 4:19 PM Alexander Smirnov <[hidden email]> wrote:Hello,I've assembled a standalone cluster of 3 task managers and 3 job managers(and 3 ZK) following the instructions athttps://ci.apache.org/projects/flink/flink-docs-release-1.4/ and https://ci.apache.org/projectsops/deployment/cluster_setup.h tml /flink/flink-docs-release-1.4/ ops/jobmanager_high_availabili ty.html It works ok, but randomly, task managers becomes unavailable. JobManager has exception like below in logs:[2018-03-19 00:33:10,211] WARN Association with remote system [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 ] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:42413 ]] Caused by: [Connection refused: qafdsflinkw811.nn.five9lab.com/10.5.61.124:42413 ] (akka.remote.ReliableDeliverySupervisor) [2018-03-21 00:30:35,975] ERROR Association to [akka.tcp://flink@qafdsflinkw811.nn.five9lab.com:35320 ] with UID [1500204560] irrecoverably failed. Quarantining address. (akka.remote.Remoting)java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 48.0 hours) at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.apply OrElse(Endpoint.scala:375) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoin t.scala:203) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.j ava:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(For kJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo l.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW orkerThread.java:107) I can't find a reason for this exception, any ideas?Thank you,Alex
Free forum by Nabble | Edit this page |