CliFrontend hang in the local case?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

CliFrontend hang in the local case?

kedar mhaswade
I am seeing a hang where the main thread of CliFrontend goes to timed waiting. This appears like a livelock. My local setup is simple: A job manager, a task manager on MacOS. My algorithm is based on Gelly's vertex centric computation. The resultant graph's vertex  count is about 4 million. I am printing the vertices to a file using 
resultVertices.writeAsCsv(save, FileSystem.WriteMode.OVERWRITE);
and that seems to write the csv alright, however my invocation of flink run appears hung because of the timed waiting of the main thread in CliFrontend. My config in flink.yaml is simple:

akka.client.timeout: 60 min
#taskmanager.heap.mb: 16384
#jobmanager.heap.mb: 8192
# 25 Feb
taskmanager.numberOfTaskSlots: 4
taskmanager.heap.mb: 50000
jobmanager.heap.mb: 20000

The log does show:

02/25/2018 18:06:34 DataSink (CsvOutputFormat (path: file:/tmp/nis, delimiter: ,))(1/1) switched to FINISHED

The main thread's dump is here:
"main" #1 prio=5 os_prio=31 tid=0x00007ffa6c801800 nid=0x2703 waiting on condition [0x000070000ffdb000]
   java.lang.Thread.State: TIMED_WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x000000076bb82568> (a scala.concurrent.impl.Promise$CompletionLatch)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
  at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
  at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
  at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
  at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.ready(package.scala:169)
  at scala.concurrent.Await.ready(package.scala)
  at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:266)
  at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
  at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
  at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
  at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
  at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
  at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
  at StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
  at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
  at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
  at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
  at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
  at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
  at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
  at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$6/812553708.run(Unknown Source)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
  at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
 
StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143) is the above line in Java source code.

The DataSink(Collect) on dashboard shows that it is still running [1]! I have been able to submit another smaller job which runs to completion. There is nothing suspicious in task manager or job manager log or thread dumps.

Any idea what is going on?

Regards,
Kedar

The entire thread dump of the CliFrontend JVM (that appears hung) via jstack is:
# taken on 26 Feb 2018
# 2018-02-26 09:33:19
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):

"Attach Listener" #67 daemon prio=9 os_prio=31 tid=0x00007ffa6cb4f800 nid=0x125f waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Thread-1" #66 daemon prio=5 os_prio=31 tid=0x00007ffa6e1b5000 nid=0x307 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"flink-akka.remote.default-remote-dispatcher-15" #45 prio=5 os_prio=31 tid=0x00007ffa6e26d000 nid=0x6303 waiting on condition [0x0000700012140000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"Hashed wheel timer #1" #38 prio=5 os_prio=31 tid=0x00007ffa6e26a800 nid=0x6103 waiting on condition [0x000070001203d000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:445)
at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:364)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at java.lang.Thread.run(Thread.java:748)

"flink-akka.remote.default-remote-dispatcher-14" #44 prio=5 os_prio=31 tid=0x00007ffa6dfb1000 nid=0x6003 waiting on condition [0x0000700011f3a000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.remote.default-remote-dispatcher-13" #43 prio=5 os_prio=31 tid=0x00007ffa6e911000 nid=0x9c03 waiting on condition [0x0000700011e37000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"New I/O server boss #6" #42 prio=5 os_prio=31 tid=0x00007ffa6e8cb000 nid=0x9e03 runnable [0x0000700011d34000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772b7a1e8> (a sun.nio.ch.Util$3)
- locked <0x0000000772b7a1f8> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772b7a198> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #5" #41 prio=5 os_prio=31 tid=0x00007ffa6cb1b800 nid=0x5e03 runnable [0x0000700011c31000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bc9ff0> (a sun.nio.ch.Util$3)
- locked <0x0000000772bc9fe0> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bca000> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #4" #40 prio=5 os_prio=31 tid=0x00007ffa6cad5800 nid=0xa003 runnable [0x0000700011b2e000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bca7c0> (a sun.nio.ch.Util$3)
- locked <0x0000000772bca7b0> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bca680> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O boss #3" #39 prio=5 os_prio=31 tid=0x00007ffa6dfd9800 nid=0x5c03 runnable [0x0000700011a2b000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bcb0d8> (a sun.nio.ch.Util$3)
- locked <0x0000000772bcb0c8> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcb0e8> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #2" #37 prio=5 os_prio=31 tid=0x00007ffa6dfac800 nid=0xa103 runnable [0x0000700011928000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bcba68> (a sun.nio.ch.Util$3)
- locked <0x0000000772bcba58> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcba78> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #1" #36 prio=5 os_prio=31 tid=0x00007ffa6df07800 nid=0xa203 runnable [0x0000700011825000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bd8528> (a sun.nio.ch.Util$3)
- locked <0x0000000772bd8518> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcd158> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"flink-akka.remote.default-remote-dispatcher-5" #35 prio=5 os_prio=31 tid=0x00007ffa6f143000 nid=0xa303 waiting on condition [0x0000700011722000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-4" #34 prio=5 os_prio=31 tid=0x00007ffa6ca88000 nid=0xa403 waiting on condition [0x000070001161f000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-3" #33 prio=5 os_prio=31 tid=0x00007ffa6b4dd000 nid=0xa603 waiting on condition [0x000070001151c000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-2" #32 prio=5 os_prio=31 tid=0x00007ffa6df05800 nid=0xa803 waiting on condition [0x0000700011419000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-scheduler-1" #31 prio=5 os_prio=31 tid=0x00007ffa6f0e4000 nid=0x571b sleeping[0x0000700011316000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:85)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:266)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)

"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007ffa6c025800 nid=0x3d03 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007ffa6b00c800 nid=0x3c03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007ffa6c00b800 nid=0x4203 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007ffa6b00c000 nid=0x4303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007ffa6b00a800 nid=0x4403 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007ffa6b009800 nid=0x4507 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007ffa6b03a800 nid=0x4c03 in Object.wait() [0x0000700010afe000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c0025d60> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000006c0025d60> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007ffa6b037800 nid=0x3003 in Object.wait() [0x00007000109fb000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c002e370> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000006c002e370> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=31 tid=0x00007ffa6c801800 nid=0x2703 waiting on condition [0x000070000ffdb000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000000076bb82568> (a scala.concurrent.impl.Promise$CompletionLatch)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.ready(package.scala:169)
at scala.concurrent.Await.ready(package.scala)
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:266)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$6/812553708.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)

"VM Thread" os_prio=31 tid=0x00007ffa6c838800 nid=0x4e03 runnable

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007ffa6c011000 nid=0x1f07 runnable

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007ffa6c011800 nid=0x2003 runnable

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007ffa6c012000 nid=0x2a03 runnable

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007ffa6c012800 nid=0x2b03 runnable

"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007ffa6b004000 nid=0x2c03 runnable

"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007ffa6b008800 nid=0x5103 runnable

"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007ffa6b012000 nid=0x5003 runnable

"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007ffa6b012800 nid=0x4f03 runnable

"VM Periodic Task Thread" os_prio=31 tid=0x00007ffa6c85f800 nid=0x4003 waiting on condition

JNI global references: 306

[1]
Inline image 1
Reply | Threaded
Open this post in threaded view
|

Re: CliFrontend hang in the local case?

Fabian Hueske-2
Hi Kedar,

Thanks for reporting this problem. Which Flink version do you use?

Also I noticed in the stack track that you are calling DataSet.print() which internally uses DataSet.collect().
Collect works by sending all data in an Akka message to the client.
Your configuration of one hour akka.client.timeout is quite unusual and result in a long waiting time until the timeout kicks in.

I'd try to remove the DataSet.print() or reduce the akka.client.timeout.

Best, Fabian

2018-02-26 20:14 GMT+01:00 kedar mhaswade <[hidden email]>:
I am seeing a hang where the main thread of CliFrontend goes to timed waiting. This appears like a livelock. My local setup is simple: A job manager, a task manager on MacOS. My algorithm is based on Gelly's vertex centric computation. The resultant graph's vertex  count is about 4 million. I am printing the vertices to a file using 
resultVertices.writeAsCsv(save, FileSystem.WriteMode.OVERWRITE);
and that seems to write the csv alright, however my invocation of flink run appears hung because of the timed waiting of the main thread in CliFrontend. My config in flink.yaml is simple:

akka.client.timeout: 60 min
#taskmanager.heap.mb: 16384
#jobmanager.heap.mb: 8192
# 25 Feb
taskmanager.numberOfTaskSlots: 4
taskmanager.heap.mb: 50000
jobmanager.heap.mb: 20000

The log does show:

02/25/2018 18:06:34 DataSink (CsvOutputFormat (path: file:/tmp/nis, delimiter: ,))(1/1) switched to FINISHED

The main thread's dump is here:
"main" #1 prio=5 os_prio=31 tid=0x00007ffa6c801800 nid=0x2703 waiting on condition [0x000070000ffdb000]
   java.lang.Thread.State: TIMED_WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x000000076bb82568> (a scala.concurrent.impl.Promise$CompletionLatch)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
  at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
  at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
  at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
  at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.ready(package.scala:169)
  at scala.concurrent.Await.ready(package.scala)
  at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:266)
  at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
  at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
  at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
  at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
  at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
  at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
  at StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
  at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
  at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
  at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
  at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
  at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
  at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
  at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$6/812553708.run(Unknown Source)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
  at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
 
StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143) is the above line in Java source code.

The DataSink(Collect) on dashboard shows that it is still running [1]! I have been able to submit another smaller job which runs to completion. There is nothing suspicious in task manager or job manager log or thread dumps.

Any idea what is going on?

Regards,
Kedar

The entire thread dump of the CliFrontend JVM (that appears hung) via jstack is:
# taken on 26 Feb 2018
# 2018-02-26 09:33:19
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):

"Attach Listener" #67 daemon prio=9 os_prio=31 tid=0x00007ffa6cb4f800 nid=0x125f waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Thread-1" #66 daemon prio=5 os_prio=31 tid=0x00007ffa6e1b5000 nid=0x307 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"flink-akka.remote.default-remote-dispatcher-15" #45 prio=5 os_prio=31 tid=0x00007ffa6e26d000 nid=0x6303 waiting on condition [0x0000700012140000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"Hashed wheel timer #1" #38 prio=5 os_prio=31 tid=0x00007ffa6e26a800 nid=0x6103 waiting on condition [0x000070001203d000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:445)
at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:364)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at java.lang.Thread.run(Thread.java:748)

"flink-akka.remote.default-remote-dispatcher-14" #44 prio=5 os_prio=31 tid=0x00007ffa6dfb1000 nid=0x6003 waiting on condition [0x0000700011f3a000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.remote.default-remote-dispatcher-13" #43 prio=5 os_prio=31 tid=0x00007ffa6e911000 nid=0x9c03 waiting on condition [0x0000700011e37000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"New I/O server boss #6" #42 prio=5 os_prio=31 tid=0x00007ffa6e8cb000 nid=0x9e03 runnable [0x0000700011d34000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772b7a1e8> (a sun.nio.ch.Util$3)
- locked <0x0000000772b7a1f8> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772b7a198> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #5" #41 prio=5 os_prio=31 tid=0x00007ffa6cb1b800 nid=0x5e03 runnable [0x0000700011c31000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bc9ff0> (a sun.nio.ch.Util$3)
- locked <0x0000000772bc9fe0> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bca000> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #4" #40 prio=5 os_prio=31 tid=0x00007ffa6cad5800 nid=0xa003 runnable [0x0000700011b2e000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bca7c0> (a sun.nio.ch.Util$3)
- locked <0x0000000772bca7b0> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bca680> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O boss #3" #39 prio=5 os_prio=31 tid=0x00007ffa6dfd9800 nid=0x5c03 runnable [0x0000700011a2b000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bcb0d8> (a sun.nio.ch.Util$3)
- locked <0x0000000772bcb0c8> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcb0e8> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #2" #37 prio=5 os_prio=31 tid=0x00007ffa6dfac800 nid=0xa103 runnable [0x0000700011928000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bcba68> (a sun.nio.ch.Util$3)
- locked <0x0000000772bcba58> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcba78> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #1" #36 prio=5 os_prio=31 tid=0x00007ffa6df07800 nid=0xa203 runnable [0x0000700011825000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bd8528> (a sun.nio.ch.Util$3)
- locked <0x0000000772bd8518> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcd158> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"flink-akka.remote.default-remote-dispatcher-5" #35 prio=5 os_prio=31 tid=0x00007ffa6f143000 nid=0xa303 waiting on condition [0x0000700011722000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-4" #34 prio=5 os_prio=31 tid=0x00007ffa6ca88000 nid=0xa403 waiting on condition [0x000070001161f000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-3" #33 prio=5 os_prio=31 tid=0x00007ffa6b4dd000 nid=0xa603 waiting on condition [0x000070001151c000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-2" #32 prio=5 os_prio=31 tid=0x00007ffa6df05800 nid=0xa803 waiting on condition [0x0000700011419000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-scheduler-1" #31 prio=5 os_prio=31 tid=0x00007ffa6f0e4000 nid=0x571b sleeping[0x0000700011316000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:85)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:266)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)

"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007ffa6c025800 nid=0x3d03 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007ffa6b00c800 nid=0x3c03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007ffa6c00b800 nid=0x4203 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007ffa6b00c000 nid=0x4303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007ffa6b00a800 nid=0x4403 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007ffa6b009800 nid=0x4507 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007ffa6b03a800 nid=0x4c03 in Object.wait() [0x0000700010afe000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c0025d60> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000006c0025d60> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007ffa6b037800 nid=0x3003 in Object.wait() [0x00007000109fb000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c002e370> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000006c002e370> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=31 tid=0x00007ffa6c801800 nid=0x2703 waiting on condition [0x000070000ffdb000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000000076bb82568> (a scala.concurrent.impl.Promise$CompletionLatch)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.ready(package.scala:169)
at scala.concurrent.Await.ready(package.scala)
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:266)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$6/812553708.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)

"VM Thread" os_prio=31 tid=0x00007ffa6c838800 nid=0x4e03 runnable

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007ffa6c011000 nid=0x1f07 runnable

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007ffa6c011800 nid=0x2003 runnable

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007ffa6c012000 nid=0x2a03 runnable

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007ffa6c012800 nid=0x2b03 runnable

"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007ffa6b004000 nid=0x2c03 runnable

"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007ffa6b008800 nid=0x5103 runnable

"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007ffa6b012000 nid=0x5003 runnable

"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007ffa6b012800 nid=0x4f03 runnable

"VM Periodic Task Thread" os_prio=31 tid=0x00007ffa6c85f800 nid=0x4003 waiting on condition

JNI global references: 306

[1]
Inline image 1

Reply | Threaded
Open this post in threaded view
|

Re: CliFrontend hang in the local case?

kedar mhaswade
Hi Fabian,

Thank you for your response.

I had run into some other issue and that is when I increased the akka.client.timeout. I will reduce that (remove to make it the default). Let me try to remove the explicit print and get back to you. I am using Flink 1.4. I posted this because the thread dump did not reveal anything significant and CliFrontend just wouldn't exit, it is waiting for something to happen, but I couldn't tell exactly what.

Regards,
Kedar


On Tue, Feb 27, 2018 at 1:32 AM, Fabian Hueske <[hidden email]> wrote:
Hi Kedar,

Thanks for reporting this problem. Which Flink version do you use?

Also I noticed in the stack track that you are calling DataSet.print() which internally uses DataSet.collect().
Collect works by sending all data in an Akka message to the client.
Your configuration of one hour akka.client.timeout is quite unusual and result in a long waiting time until the timeout kicks in.

I'd try to remove the DataSet.print() or reduce the akka.client.timeout.

Best, Fabian

2018-02-26 20:14 GMT+01:00 kedar mhaswade <[hidden email]>:
I am seeing a hang where the main thread of CliFrontend goes to timed waiting. This appears like a livelock. My local setup is simple: A job manager, a task manager on MacOS. My algorithm is based on Gelly's vertex centric computation. The resultant graph's vertex  count is about 4 million. I am printing the vertices to a file using 
resultVertices.writeAsCsv(save, FileSystem.WriteMode.OVERWRITE);
and that seems to write the csv alright, however my invocation of flink run appears hung because of the timed waiting of the main thread in CliFrontend. My config in flink.yaml is simple:

akka.client.timeout: 60 min
#taskmanager.heap.mb: 16384
#jobmanager.heap.mb: 8192
# 25 Feb
taskmanager.numberOfTaskSlots: 4
taskmanager.heap.mb: 50000
jobmanager.heap.mb: 20000

The log does show:

02/25/2018 18:06:34 DataSink (CsvOutputFormat (path: file:/tmp/nis, delimiter: ,))(1/1) switched to FINISHED

The main thread's dump is here:
"main" #1 prio=5 os_prio=31 tid=0x00007ffa6c801800 nid=0x2703 waiting on condition [0x000070000ffdb000]
   java.lang.Thread.State: TIMED_WAITING (parking)
  at sun.misc.Unsafe.park(Native Method)
  - parking to wait for  <0x000000076bb82568> (a scala.concurrent.impl.Promise$CompletionLatch)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
  at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
  at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
  at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
  at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
  at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
  at scala.concurrent.Await$.ready(package.scala:169)
  at scala.concurrent.Await.ready(package.scala)
  at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:266)
  at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
  at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
  at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
  at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
  at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
  at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
  at StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
  at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
  at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
  at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
  at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
  at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
  at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
  at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
  at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$6/812553708.run(Unknown Source)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
  at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
 
StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143) is the above line in Java source code.

The DataSink(Collect) on dashboard shows that it is still running [1]! I have been able to submit another smaller job which runs to completion. There is nothing suspicious in task manager or job manager log or thread dumps.

Any idea what is going on?

Regards,
Kedar

The entire thread dump of the CliFrontend JVM (that appears hung) via jstack is:
# taken on 26 Feb 2018
# 2018-02-26 09:33:19
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.131-b11 mixed mode):

"Attach Listener" #67 daemon prio=9 os_prio=31 tid=0x00007ffa6cb4f800 nid=0x125f waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Thread-1" #66 daemon prio=5 os_prio=31 tid=0x00007ffa6e1b5000 nid=0x307 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"flink-akka.remote.default-remote-dispatcher-15" #45 prio=5 os_prio=31 tid=0x00007ffa6e26d000 nid=0x6303 waiting on condition [0x0000700012140000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"Hashed wheel timer #1" #38 prio=5 os_prio=31 tid=0x00007ffa6e26a800 nid=0x6103 waiting on condition [0x000070001203d000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:445)
at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:364)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at java.lang.Thread.run(Thread.java:748)

"flink-akka.remote.default-remote-dispatcher-14" #44 prio=5 os_prio=31 tid=0x00007ffa6dfb1000 nid=0x6003 waiting on condition [0x0000700011f3a000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.remote.default-remote-dispatcher-13" #43 prio=5 os_prio=31 tid=0x00007ffa6e911000 nid=0x9c03 waiting on condition [0x0000700011e37000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"New I/O server boss #6" #42 prio=5 os_prio=31 tid=0x00007ffa6e8cb000 nid=0x9e03 runnable [0x0000700011d34000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772b7a1e8> (a sun.nio.ch.Util$3)
- locked <0x0000000772b7a1f8> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772b7a198> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #5" #41 prio=5 os_prio=31 tid=0x00007ffa6cb1b800 nid=0x5e03 runnable [0x0000700011c31000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bc9ff0> (a sun.nio.ch.Util$3)
- locked <0x0000000772bc9fe0> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bca000> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #4" #40 prio=5 os_prio=31 tid=0x00007ffa6cad5800 nid=0xa003 runnable [0x0000700011b2e000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bca7c0> (a sun.nio.ch.Util$3)
- locked <0x0000000772bca7b0> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bca680> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O boss #3" #39 prio=5 os_prio=31 tid=0x00007ffa6dfd9800 nid=0x5c03 runnable [0x0000700011a2b000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bcb0d8> (a sun.nio.ch.Util$3)
- locked <0x0000000772bcb0c8> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcb0e8> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #2" #37 prio=5 os_prio=31 tid=0x00007ffa6dfac800 nid=0xa103 runnable [0x0000700011928000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bcba68> (a sun.nio.ch.Util$3)
- locked <0x0000000772bcba58> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcba78> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"New I/O worker #1" #36 prio=5 os_prio=31 tid=0x00007ffa6df07800 nid=0xa203 runnable [0x0000700011825000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0000000772bd8528> (a sun.nio.ch.Util$3)
- locked <0x0000000772bd8518> (a java.util.Collections$UnmodifiableSet)
- locked <0x0000000772bcd158> (a sun.nio.ch.KQueueSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

"flink-akka.remote.default-remote-dispatcher-5" #35 prio=5 os_prio=31 tid=0x00007ffa6f143000 nid=0xa303 waiting on condition [0x0000700011722000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b54218> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-4" #34 prio=5 os_prio=31 tid=0x00007ffa6ca88000 nid=0xa403 waiting on condition [0x000070001161f000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-3" #33 prio=5 os_prio=31 tid=0x00007ffa6b4dd000 nid=0xa603 waiting on condition [0x000070001151c000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-akka.actor.default-dispatcher-2" #32 prio=5 os_prio=31 tid=0x00007ffa6df05800 nid=0xa803 waiting on condition [0x0000700011419000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000772b2baa8> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

"flink-scheduler-1" #31 prio=5 os_prio=31 tid=0x00007ffa6f0e4000 nid=0x571b sleeping[0x0000700011316000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:85)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:266)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)

"Service Thread" #9 daemon prio=9 os_prio=31 tid=0x00007ffa6c025800 nid=0x3d03 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x00007ffa6b00c800 nid=0x3c03 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x00007ffa6c00b800 nid=0x4203 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x00007ffa6b00c000 nid=0x4303 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x00007ffa6b00a800 nid=0x4403 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x00007ffa6b009800 nid=0x4507 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

"Finalizer" #3 daemon prio=8 os_prio=31 tid=0x00007ffa6b03a800 nid=0x4c03 in Object.wait() [0x0000700010afe000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c0025d60> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000006c0025d60> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x00007ffa6b037800 nid=0x3003 in Object.wait() [0x00007000109fb000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c002e370> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000006c002e370> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"main" #1 prio=5 os_prio=31 tid=0x00007ffa6c801800 nid=0x2703 waiting on condition [0x000070000ffdb000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x000000076bb82568> (a scala.concurrent.impl.Promise$CompletionLatch)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.ready(package.scala:169)
at scala.concurrent.Await.ready(package.scala)
at org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:266)
at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:387)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:481)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:444)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:815)
at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
at StatefulVertexCentricInfluenceScore.main(StatefulVertexCentricInfluenceScore.java:143)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$6/812553708.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1807)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)

"VM Thread" os_prio=31 tid=0x00007ffa6c838800 nid=0x4e03 runnable

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x00007ffa6c011000 nid=0x1f07 runnable

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x00007ffa6c011800 nid=0x2003 runnable

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x00007ffa6c012000 nid=0x2a03 runnable

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x00007ffa6c012800 nid=0x2b03 runnable

"GC task thread#4 (ParallelGC)" os_prio=31 tid=0x00007ffa6b004000 nid=0x2c03 runnable

"GC task thread#5 (ParallelGC)" os_prio=31 tid=0x00007ffa6b008800 nid=0x5103 runnable

"GC task thread#6 (ParallelGC)" os_prio=31 tid=0x00007ffa6b012000 nid=0x5003 runnable

"GC task thread#7 (ParallelGC)" os_prio=31 tid=0x00007ffa6b012800 nid=0x4f03 runnable

"VM Periodic Task Thread" os_prio=31 tid=0x00007ffa6c85f800 nid=0x4003 waiting on condition

JNI global references: 306

[1]
Inline image 1