Flink batch job stalls on Yarn ...

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink batch job stalls on Yarn ...

kedar mhaswade
We have a fairly small graph (1.7M edges and 2.1M vertices) that we are analyzing using an iterative graph algorithm that we have written using gelly.

When we run it on single Mac, things work and the job finishes in about 5 minutes.

When we deploy the same job on Yarn with this config:
akka.client.timeout: 40 min
# 15G
taskmanager.heap.mb: 15360
jobmanager.heap.mb: 15360
taskmanager.memory.off-heap: true

The job just stalls. The dashboard shows that it just waits for the DataSink Collect() operation to finish. It looks like this:

Inline image 1

I managed to get a thread dump from one of the task managers [1] but couldn't say anything conclusively. A number of things can go wrong when we go from single host to a scheduler like Yarn, but does anyone have any clue about how to debug? What are the containers waiting for? The memory/CPU consumption on containers are negligible and this apparent "stall" is rather inexplicable. Are we missing something basic? (There's nothing in the log; last of the log records from the job manager are at [2]).

Regards,
Kedar
[1]: A task manager thread dump

2018-02-08 16:32:49
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):

"RMI TCP Connection(8)-127.0.0.1" - Thread t@190
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:550)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$22/1785671844.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"JMX server connection timeout 189" - Thread t@189
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <38b27d7> (a [I)
        at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Connection(7)-127.0.0.1" - Thread t@188
   java.lang.Thread.State: RUNNABLE
        at sun.management.ThreadImpl.dumpThreads0(Native Method)
        at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:454)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.reflect.misc.Trampoline.invoke(MethodUtil.java:71)
        at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:275)
        at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:193)
        at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:175)
        at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:117)
        at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:54)
        at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237)
        at com.sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138)
        at com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252)
        at javax.management.StandardMBean.invoke(StandardMBean.java:405)
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
        at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
        at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
        at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
        at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
        at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
        at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
        at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:346)
        at sun.rmi.transport.Transport$1.run(Transport.java:200)
        at sun.rmi.transport.Transport$1.run(Transport.java:197)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$22/1785671844.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-24" - Thread t@159
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-23" - Thread t@142
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-22" - Thread t@141
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"RMI Scheduler(0)" - Thread t@136
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6578507c> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-21" - Thread t@134
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-20" - Thread t@133
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-19" - Thread t@132
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Flink Netty Client (0) Thread 1" - Thread t@44
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"threadDeathWatcher-2-1" - Thread t@106
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.shaded.netty4.io.netty.util.ThreadDeathWatcher$Watcher.run(ThreadDeathWatcher.java:137)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink Netty Client (0) Thread 0" - Thread t@43
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink Netty Server (0) Thread 1" - Thread t@46
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-18" - Thread t@102
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-17" - Thread t@101
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"org.apache.hadoop.hdfs.PeerCache@10f58193" - Thread t@90
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:249)
        at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:46)
        at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:124)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Thread-34" - Thread t@86
   java.lang.Thread.State: RUNNABLE
        at org.apache.hadoop.net.unix.DomainSocketWatcher.doPoll0(Native Method)
        at org.apache.hadoop.net.unix.DomainSocketWatcher.access$900(DomainSocketWatcher.java:52)
        at org.apache.hadoop.net.unix.DomainSocketWatcher$2.run(DomainSocketWatcher.java:509)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner" - Thread t@85
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <47f78065> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:2989)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink-MetricRegistry-thread-1" - Thread t@81
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <60abd357> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-16" - Thread t@80
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Timer-1" - Thread t@79
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <98dc2e9> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:552)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"Timer-0" - Thread t@77
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <54d0fdab> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:552)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-15" - Thread t@75
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Hashed wheel timer #1" - Thread t@37
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:445)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:364)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #12" - Thread t@71
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2f5fbbfc> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #11" - Thread t@70
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <1150f42b> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #10" - Thread t@69
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <76adf662> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #9" - Thread t@68
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <25c370b9> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #8" - Thread t@67
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <7711bfa6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #7" - Thread t@66
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <61f6fcc3> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #6" - Thread t@65
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <78711879> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #5" - Thread t@64
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4689e976> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #4" - Thread t@63
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <3c1f928e> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #3" - Thread t@62
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6a59712a> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #2" - Thread t@61
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <1a1b0ec8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #1" - Thread t@60
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <135096a4> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #12" - Thread t@59
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <539beda2> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #11" - Thread t@58
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <726a06da> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #10" - Thread t@57
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <30c53b11> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #9" - Thread t@56
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <27376e1b> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #8" - Thread t@55
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2bcd61a2> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #7" - Thread t@54
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <933eed7> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #6" - Thread t@53
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <10375153> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #5" - Thread t@52
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <cecb2f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #4" - Thread t@51
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4c6c02b6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #3" - Thread t@50
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <5a3f18ba> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #2" - Thread t@49
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <70a86767> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #1" - Thread t@48
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <19ba60f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"Flink Netty Server (0) Thread 0" - Thread t@45
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-14" - Thread t@42
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"New I/O server boss #6" - Thread t@41
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #5" - Thread t@40
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #4" - Thread t@39
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O boss #3" - Thread t@38
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #2" - Thread t@36
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #1" - Thread t@35
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-6" - Thread t@34
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-5" - Thread t@33
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-4" - Thread t@32
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-3" - Thread t@31
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-2" - Thread t@30
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-scheduler-1" - Thread t@29
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:85)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:266)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-0" - Thread t@24
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-50001" - Thread t@23
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Signal Dispatcher" - Thread t@5
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Finalizer" - Thread t@3
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <35efa8cb> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

   Locked ownable synchronizers:
        - None

"Reference Handler" - Thread t@2
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <17b79c04> (a java.lang.ref.Reference$Lock)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

   Locked ownable synchronizers:
        - None

"main" - Thread t@1
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <52c4db3b> (a scala.concurrent.impl.Promise$CompletionLatch)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.ready(package.scala:169)
        at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:844)
        at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:845)
        at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1879)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply$mcV$sp(TaskManager.scala:1944)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1922)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1922)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.akka.AkkaUtils$.retryOnBindException(AkkaUtils.scala:755)
        at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1922)
        at org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1691)
        at org.apache.flink.runtime.taskmanager.TaskManager.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala)
        at org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:149)
        at org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:145)
        at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$12/890545344.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:145)
        at org.apache.flink.yarn.YarnTaskManager$.main(YarnTaskManager.scala:65)
        at org.apache.flink.yarn.YarnTaskManager.main(YarnTaskManager.scala)

   Locked ownable synchronizers:
        - None

[2]: job manager log:
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from CREATED to SCHEDULED.
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from SCHEDULED to DEPLOYING.
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying DataSink (collect()) (8/8) (attempt #0) to hadoopworker2872-sjc1
2018-02-09 00:30:35,591 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CoGroup (Messaging) (8/8) (fb783ae066c6661cc8d5891de73a5fd4) switched from RUNNING to FINISHED.
2018-02-09 00:30:35,592 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from DEPLOYING to RUNNING.
2018-02-09 00:30:36,060 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (7/8) (c40e67da5f4f69c904635a1afa195d62) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,089 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (8/8) (5b71deb07a1535f0e48693ed116b6878) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (6/8) (03f067c00462327c8184ad82bb8a652b) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,144 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (2/8) (7532734db562d839d9b37f5a7685a767) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,177 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (5/8) (2a199e2d38809b783d402666a5c0865d) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,198 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (4/8) (4947f22d1c3de66cb29ff649835287be) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (1/8) (80d462c61a7f81af697b49cdc2bf047e) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,533 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (3/8) (7951220955b77b4719de17eea1ff6054) switched from RUNNING to FINISHED.

Reply | Threaded
Open this post in threaded view
|

Re: Flink batch job stalls on Yarn ...

Till Rohrmann
Hi Kedar,

the stack traces don't show that the TaskManager is doing anything. Thus, I would assume that it has already finished processing the job's tasks. You should see in the web ui which tasks have finished and which not.

It seems that you are using collect(). How large is the expected size of the result you're collecting? Collect can only retrieve results which are smaller than the configured akka.framesize. Per default it is set to 10 MB. This might cause the problem you're observing. But without the logs it is hard to tell.

Cheers,
Till


On Fri, Feb 9, 2018 at 1:50 AM, kedar mhaswade <[hidden email]> wrote:
We have a fairly small graph (1.7M edges and 2.1M vertices) that we are analyzing using an iterative graph algorithm that we have written using gelly.

When we run it on single Mac, things work and the job finishes in about 5 minutes.

When we deploy the same job on Yarn with this config:
akka.client.timeout: 40 min
# 15G
taskmanager.heap.mb: 15360
jobmanager.heap.mb: 15360
taskmanager.memory.off-heap: true

The job just stalls. The dashboard shows that it just waits for the DataSink Collect() operation to finish. It looks like this:

Inline image 1

I managed to get a thread dump from one of the task managers [1] but couldn't say anything conclusively. A number of things can go wrong when we go from single host to a scheduler like Yarn, but does anyone have any clue about how to debug? What are the containers waiting for? The memory/CPU consumption on containers are negligible and this apparent "stall" is rather inexplicable. Are we missing something basic? (There's nothing in the log; last of the log records from the job manager are at [2]).

Regards,
Kedar
[1]: A task manager thread dump

2018-02-08 16:32:49
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):

"RMI TCP Connection(8)-127.0.0.1" - Thread t@190
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:550)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$22/1785671844.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"JMX server connection timeout 189" - Thread t@189
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <38b27d7> (a [I)
        at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Connection(7)-127.0.0.1" - Thread t@188
   java.lang.Thread.State: RUNNABLE
        at sun.management.ThreadImpl.dumpThreads0(Native Method)
        at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:454)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.reflect.misc.Trampoline.invoke(MethodUtil.java:71)
        at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:275)
        at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:193)
        at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:175)
        at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:117)
        at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:54)
        at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237)
        at com.sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138)
        at com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252)
        at javax.management.StandardMBean.invoke(StandardMBean.java:405)
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
        at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
        at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
        at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
        at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
        at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
        at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
        at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:346)
        at sun.rmi.transport.Transport$1.run(Transport.java:200)
        at sun.rmi.transport.Transport$1.run(Transport.java:197)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$22/1785671844.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-24" - Thread t@159
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-23" - Thread t@142
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-22" - Thread t@141
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"RMI Scheduler(0)" - Thread t@136
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6578507c> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-21" - Thread t@134
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-20" - Thread t@133
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-19" - Thread t@132
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Flink Netty Client (0) Thread 1" - Thread t@44
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"threadDeathWatcher-2-1" - Thread t@106
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.shaded.netty4.io.netty.util.ThreadDeathWatcher$Watcher.run(ThreadDeathWatcher.java:137)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink Netty Client (0) Thread 0" - Thread t@43
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink Netty Server (0) Thread 1" - Thread t@46
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-18" - Thread t@102
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-17" - Thread t@101
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"org.apache.hadoop.hdfs.PeerCache@10f58193" - Thread t@90
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:249)
        at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:46)
        at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:124)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Thread-34" - Thread t@86
   java.lang.Thread.State: RUNNABLE
        at org.apache.hadoop.net.unix.DomainSocketWatcher.doPoll0(Native Method)
        at org.apache.hadoop.net.unix.DomainSocketWatcher.access$900(DomainSocketWatcher.java:52)
        at org.apache.hadoop.net.unix.DomainSocketWatcher$2.run(DomainSocketWatcher.java:509)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner" - Thread t@85
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <47f78065> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:2989)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink-MetricRegistry-thread-1" - Thread t@81
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <60abd357> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-16" - Thread t@80
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Timer-1" - Thread t@79
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <98dc2e9> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:552)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"Timer-0" - Thread t@77
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <54d0fdab> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:552)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-15" - Thread t@75
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Hashed wheel timer #1" - Thread t@37
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:445)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:364)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #12" - Thread t@71
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2f5fbbfc> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #11" - Thread t@70
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <1150f42b> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #10" - Thread t@69
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <76adf662> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #9" - Thread t@68
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <25c370b9> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #8" - Thread t@67
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <7711bfa6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #7" - Thread t@66
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <61f6fcc3> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #6" - Thread t@65
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <78711879> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #5" - Thread t@64
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4689e976> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #4" - Thread t@63
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <3c1f928e> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #3" - Thread t@62
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6a59712a> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #2" - Thread t@61
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <1a1b0ec8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #1" - Thread t@60
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <135096a4> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #12" - Thread t@59
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <539beda2> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #11" - Thread t@58
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <726a06da> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #10" - Thread t@57
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <30c53b11> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #9" - Thread t@56
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <27376e1b> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #8" - Thread t@55
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2bcd61a2> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #7" - Thread t@54
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <933eed7> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #6" - Thread t@53
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <10375153> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #5" - Thread t@52
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <cecb2f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #4" - Thread t@51
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4c6c02b6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #3" - Thread t@50
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <5a3f18ba> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #2" - Thread t@49
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <70a86767> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #1" - Thread t@48
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <19ba60f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"Flink Netty Server (0) Thread 0" - Thread t@45
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-14" - Thread t@42
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"New I/O server boss #6" - Thread t@41
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #5" - Thread t@40
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #4" - Thread t@39
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O boss #3" - Thread t@38
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #2" - Thread t@36
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #1" - Thread t@35
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-6" - Thread t@34
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-5" - Thread t@33
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-4" - Thread t@32
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-3" - Thread t@31
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-2" - Thread t@30
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-scheduler-1" - Thread t@29
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:85)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:266)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-0" - Thread t@24
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-50001" - Thread t@23
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Signal Dispatcher" - Thread t@5
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Finalizer" - Thread t@3
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <35efa8cb> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

   Locked ownable synchronizers:
        - None

"Reference Handler" - Thread t@2
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <17b79c04> (a java.lang.ref.Reference$Lock)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

   Locked ownable synchronizers:
        - None

"main" - Thread t@1
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <52c4db3b> (a scala.concurrent.impl.Promise$CompletionLatch)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.ready(package.scala:169)
        at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:844)
        at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:845)
        at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1879)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply$mcV$sp(TaskManager.scala:1944)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1922)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1922)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.akka.AkkaUtils$.retryOnBindException(AkkaUtils.scala:755)
        at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1922)
        at org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1691)
        at org.apache.flink.runtime.taskmanager.TaskManager.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala)
        at org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:149)
        at org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:145)
        at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$12/890545344.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:145)
        at org.apache.flink.yarn.YarnTaskManager$.main(YarnTaskManager.scala:65)
        at org.apache.flink.yarn.YarnTaskManager.main(YarnTaskManager.scala)

   Locked ownable synchronizers:
        - None

[2]: job manager log:
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from CREATED to SCHEDULED.
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from SCHEDULED to DEPLOYING.
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying DataSink (collect()) (8/8) (attempt #0) to hadoopworker2872-sjc1
2018-02-09 00:30:35,591 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CoGroup (Messaging) (8/8) (fb783ae066c6661cc8d5891de73a5fd4) switched from RUNNING to FINISHED.
2018-02-09 00:30:35,592 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from DEPLOYING to RUNNING.
2018-02-09 00:30:36,060 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (7/8) (c40e67da5f4f69c904635a1afa195d62) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,089 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (8/8) (5b71deb07a1535f0e48693ed116b6878) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (6/8) (03f067c00462327c8184ad82bb8a652b) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,144 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (2/8) (7532734db562d839d9b37f5a7685a767) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,177 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (5/8) (2a199e2d38809b783d402666a5c0865d) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,198 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (4/8) (4947f22d1c3de66cb29ff649835287be) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (1/8) (80d462c61a7f81af697b49cdc2bf047e) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,533 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (3/8) (7951220955b77b4719de17eea1ff6054) switched from RUNNING to FINISHED.


Reply | Threaded
Open this post in threaded view
|

Re: Flink batch job stalls on Yarn ...

kedar mhaswade
Hi Till,

Thank you for your email. I tried with an increased parallelism (i.e. I ran my flink job with -p 32 or -p 16 instead of -p 8) and then it worked with the default akka.framesize of 10 MiB (the yarn cluster size is 32 containers). But the issue is that otherwise it just stalls and gives an impression as if nothing is happening -- is there a log setting that I should do to make Task managers log more information in such cases? Another issue is that of concurrent container failures. If the containers are restarted by Yarn, the computation is not rerun (ala Spark) and the entire job fails. Thus, in general same flink job runs to completion only about 50% of the times on Yarn. Is this something that I should expect?

Regards,
Kedar


On Fri, Feb 9, 2018 at 1:50 AM, Till Rohrmann <[hidden email]> wrote:
Hi Kedar,

the stack traces don't show that the TaskManager is doing anything. Thus, I would assume that it has already finished processing the job's tasks. You should see in the web ui which tasks have finished and which not.

It seems that you are using collect(). How large is the expected size of the result you're collecting? Collect can only retrieve results which are smaller than the configured akka.framesize. Per default it is set to 10 MB. This might cause the problem you're observing. But without the logs it is hard to tell.

Cheers,
Till


On Fri, Feb 9, 2018 at 1:50 AM, kedar mhaswade <[hidden email]> wrote:
We have a fairly small graph (1.7M edges and 2.1M vertices) that we are analyzing using an iterative graph algorithm that we have written using gelly.

When we run it on single Mac, things work and the job finishes in about 5 minutes.

When we deploy the same job on Yarn with this config:
akka.client.timeout: 40 min
# 15G
taskmanager.heap.mb: 15360
jobmanager.heap.mb: 15360
taskmanager.memory.off-heap: true

The job just stalls. The dashboard shows that it just waits for the DataSink Collect() operation to finish. It looks like this:

Inline image 1

I managed to get a thread dump from one of the task managers [1] but couldn't say anything conclusively. A number of things can go wrong when we go from single host to a scheduler like Yarn, but does anyone have any clue about how to debug? What are the containers waiting for? The memory/CPU consumption on containers are negligible and this apparent "stall" is rather inexplicable. Are we missing something basic? (There's nothing in the log; last of the log records from the job manager are at [2]).

Regards,
Kedar
[1]: A task manager thread dump

2018-02-08 16:32:49
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):

"RMI TCP Connection(8)-127.0.0.1" - Thread t@190
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:550)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$22/1785671844.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"JMX server connection timeout 189" - Thread t@189
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <38b27d7> (a [I)
        at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Connection(7)-127.0.0.1" - Thread t@188
   java.lang.Thread.State: RUNNABLE
        at sun.management.ThreadImpl.dumpThreads0(Native Method)
        at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:454)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.reflect.misc.Trampoline.invoke(MethodUtil.java:71)
        at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:275)
        at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:193)
        at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:175)
        at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:117)
        at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:54)
        at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237)
        at com.sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138)
        at com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252)
        at javax.management.StandardMBean.invoke(StandardMBean.java:405)
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
        at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
        at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
        at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
        at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
        at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
        at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
        at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:346)
        at sun.rmi.transport.Transport$1.run(Transport.java:200)
        at sun.rmi.transport.Transport$1.run(Transport.java:197)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$22/1785671844.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-24" - Thread t@159
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-23" - Thread t@142
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-22" - Thread t@141
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"RMI Scheduler(0)" - Thread t@136
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6578507c> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-21" - Thread t@134
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-20" - Thread t@133
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-19" - Thread t@132
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Flink Netty Client (0) Thread 1" - Thread t@44
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"threadDeathWatcher-2-1" - Thread t@106
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.shaded.netty4.io.netty.util.ThreadDeathWatcher$Watcher.run(ThreadDeathWatcher.java:137)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink Netty Client (0) Thread 0" - Thread t@43
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink Netty Server (0) Thread 1" - Thread t@46
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-18" - Thread t@102
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-17" - Thread t@101
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"org.apache.hadoop.hdfs.PeerCache@10f58193" - Thread t@90
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:249)
        at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:46)
        at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:124)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Thread-34" - Thread t@86
   java.lang.Thread.State: RUNNABLE
        at org.apache.hadoop.net.unix.DomainSocketWatcher.doPoll0(Native Method)
        at org.apache.hadoop.net.unix.DomainSocketWatcher.access$900(DomainSocketWatcher.java:52)
        at org.apache.hadoop.net.unix.DomainSocketWatcher$2.run(DomainSocketWatcher.java:509)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner" - Thread t@85
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <47f78065> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:2989)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink-MetricRegistry-thread-1" - Thread t@81
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <60abd357> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-16" - Thread t@80
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Timer-1" - Thread t@79
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <98dc2e9> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:552)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"Timer-0" - Thread t@77
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <54d0fdab> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:552)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-15" - Thread t@75
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Hashed wheel timer #1" - Thread t@37
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:445)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:364)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #12" - Thread t@71
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2f5fbbfc> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #11" - Thread t@70
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <1150f42b> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #10" - Thread t@69
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <76adf662> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #9" - Thread t@68
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <25c370b9> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #8" - Thread t@67
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <7711bfa6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #7" - Thread t@66
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <61f6fcc3> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #6" - Thread t@65
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <78711879> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #5" - Thread t@64
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4689e976> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #4" - Thread t@63
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <3c1f928e> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #3" - Thread t@62
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6a59712a> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #2" - Thread t@61
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <1a1b0ec8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #1" - Thread t@60
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <135096a4> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #12" - Thread t@59
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <539beda2> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #11" - Thread t@58
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <726a06da> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #10" - Thread t@57
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <30c53b11> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #9" - Thread t@56
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <27376e1b> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #8" - Thread t@55
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2bcd61a2> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #7" - Thread t@54
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <933eed7> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #6" - Thread t@53
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <10375153> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #5" - Thread t@52
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <cecb2f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #4" - Thread t@51
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4c6c02b6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #3" - Thread t@50
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <5a3f18ba> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #2" - Thread t@49
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <70a86767> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #1" - Thread t@48
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <19ba60f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"Flink Netty Server (0) Thread 0" - Thread t@45
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-14" - Thread t@42
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"New I/O server boss #6" - Thread t@41
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #5" - Thread t@40
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #4" - Thread t@39
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O boss #3" - Thread t@38
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #2" - Thread t@36
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #1" - Thread t@35
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-6" - Thread t@34
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-5" - Thread t@33
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-4" - Thread t@32
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-3" - Thread t@31
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-2" - Thread t@30
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-scheduler-1" - Thread t@29
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:85)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:266)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-0" - Thread t@24
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-50001" - Thread t@23
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Signal Dispatcher" - Thread t@5
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Finalizer" - Thread t@3
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <35efa8cb> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

   Locked ownable synchronizers:
        - None

"Reference Handler" - Thread t@2
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <17b79c04> (a java.lang.ref.Reference$Lock)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

   Locked ownable synchronizers:
        - None

"main" - Thread t@1
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <52c4db3b> (a scala.concurrent.impl.Promise$CompletionLatch)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.ready(package.scala:169)
        at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:844)
        at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:845)
        at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1879)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply$mcV$sp(TaskManager.scala:1944)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1922)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1922)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.akka.AkkaUtils$.retryOnBindException(AkkaUtils.scala:755)
        at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1922)
        at org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1691)
        at org.apache.flink.runtime.taskmanager.TaskManager.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala)
        at org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:149)
        at org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:145)
        at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$12/890545344.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:145)
        at org.apache.flink.yarn.YarnTaskManager$.main(YarnTaskManager.scala:65)
        at org.apache.flink.yarn.YarnTaskManager.main(YarnTaskManager.scala)

   Locked ownable synchronizers:
        - None

[2]: job manager log:
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from CREATED to SCHEDULED.
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from SCHEDULED to DEPLOYING.
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying DataSink (collect()) (8/8) (attempt #0) to hadoopworker2872-sjc1
2018-02-09 00:30:35,591 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CoGroup (Messaging) (8/8) (fb783ae066c6661cc8d5891de73a5fd4) switched from RUNNING to FINISHED.
2018-02-09 00:30:35,592 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from DEPLOYING to RUNNING.
2018-02-09 00:30:36,060 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (7/8) (c40e67da5f4f69c904635a1afa195d62) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,089 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (8/8) (5b71deb07a1535f0e48693ed116b6878) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (6/8) (03f067c00462327c8184ad82bb8a652b) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,144 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (2/8) (7532734db562d839d9b37f5a7685a767) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,177 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (5/8) (2a199e2d38809b783d402666a5c0865d) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,198 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (4/8) (4947f22d1c3de66cb29ff649835287be) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (1/8) (80d462c61a7f81af697b49cdc2bf047e) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,533 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (3/8) (7951220955b77b4719de17eea1ff6054) switched from RUNNING to FINISHED.



Reply | Threaded
Open this post in threaded view
|

Re: Flink batch job stalls on Yarn ...

Till Rohrmann
Hi Kedar,

if you set logging to debug and set akka.log.lifecycle.events to true in the configuration you should see some log statements that messages were dropped due to exceeding the framesize.

In order to recover from Yarn container failures, you have to set yarn.maximum-failed-containers to something greater than 0. You should also check which RestartStrategy you've set. Moreover I would recommend enabling HA mode [1]. That way you will also recover from JobManager failures. For Yarn it might 

What's not so clear to me is why the parallelism change made it work. At the very latest when the JobManager communicates to the client all elements of the DataSet are aggregated and then sent to the client due to the collect call. Thus, changing the parallelism should not influence this size. 


Cheers,
Till

On Sat, Feb 10, 2018 at 12:26 AM, kedar mhaswade <[hidden email]> wrote:
Hi Till,

Thank you for your email. I tried with an increased parallelism (i.e. I ran my flink job with -p 32 or -p 16 instead of -p 8) and then it worked with the default akka.framesize of 10 MiB (the yarn cluster size is 32 containers). But the issue is that otherwise it just stalls and gives an impression as if nothing is happening -- is there a log setting that I should do to make Task managers log more information in such cases? Another issue is that of concurrent container failures. If the containers are restarted by Yarn, the computation is not rerun (ala Spark) and the entire job fails. Thus, in general same flink job runs to completion only about 50% of the times on Yarn. Is this something that I should expect?

Regards,
Kedar


On Fri, Feb 9, 2018 at 1:50 AM, Till Rohrmann <[hidden email]> wrote:
Hi Kedar,

the stack traces don't show that the TaskManager is doing anything. Thus, I would assume that it has already finished processing the job's tasks. You should see in the web ui which tasks have finished and which not.

It seems that you are using collect(). How large is the expected size of the result you're collecting? Collect can only retrieve results which are smaller than the configured akka.framesize. Per default it is set to 10 MB. This might cause the problem you're observing. But without the logs it is hard to tell.

Cheers,
Till


On Fri, Feb 9, 2018 at 1:50 AM, kedar mhaswade <[hidden email]> wrote:
We have a fairly small graph (1.7M edges and 2.1M vertices) that we are analyzing using an iterative graph algorithm that we have written using gelly.

When we run it on single Mac, things work and the job finishes in about 5 minutes.

When we deploy the same job on Yarn with this config:
akka.client.timeout: 40 min
# 15G
taskmanager.heap.mb: 15360
jobmanager.heap.mb: 15360
taskmanager.memory.off-heap: true

The job just stalls. The dashboard shows that it just waits for the DataSink Collect() operation to finish. It looks like this:

Inline image 1

I managed to get a thread dump from one of the task managers [1] but couldn't say anything conclusively. A number of things can go wrong when we go from single host to a scheduler like Yarn, but does anyone have any clue about how to debug? What are the containers waiting for? The memory/CPU consumption on containers are negligible and this apparent "stall" is rather inexplicable. Are we missing something basic? (There's nothing in the log; last of the log records from the job manager are at [2]).

Regards,
Kedar
[1]: A task manager thread dump

2018-02-08 16:32:49
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):

"RMI TCP Connection(8)-127.0.0.1" - Thread t@190
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
        at java.net.SocketInputStream.read(SocketInputStream.java:171)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:550)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$22/1785671844.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"JMX server connection timeout 189" - Thread t@189
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <38b27d7> (a [I)
        at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Connection(7)-127.0.0.1" - Thread t@188
   java.lang.Thread.State: RUNNABLE
        at sun.management.ThreadImpl.dumpThreads0(Native Method)
        at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:454)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.reflect.misc.Trampoline.invoke(MethodUtil.java:71)
        at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:275)
        at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:193)
        at com.sun.jmx.mbeanserver.ConvertingMethod.invokeWithOpenReturn(ConvertingMethod.java:175)
        at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:117)
        at com.sun.jmx.mbeanserver.MXBeanIntrospector.invokeM2(MXBeanIntrospector.java:54)
        at com.sun.jmx.mbeanserver.MBeanIntrospector.invokeM(MBeanIntrospector.java:237)
        at com.sun.jmx.mbeanserver.PerInterface.invoke(PerInterface.java:138)
        at com.sun.jmx.mbeanserver.MBeanSupport.invoke(MBeanSupport.java:252)
        at javax.management.StandardMBean.invoke(StandardMBean.java:405)
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
        at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
        at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
        at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
        at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
        at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
        at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
        at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:346)
        at sun.rmi.transport.Transport$1.run(Transport.java:200)
        at sun.rmi.transport.Transport$1.run(Transport.java:197)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
        at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$22/1785671844.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-24" - Thread t@159
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-23" - Thread t@142
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-22" - Thread t@141
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"RMI Scheduler(0)" - Thread t@136
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6578507c> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-21" - Thread t@134
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-20" - Thread t@133
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-19" - Thread t@132
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Flink Netty Client (0) Thread 1" - Thread t@44
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"threadDeathWatcher-2-1" - Thread t@106
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.shaded.netty4.io.netty.util.ThreadDeathWatcher$Watcher.run(ThreadDeathWatcher.java:137)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink Netty Client (0) Thread 0" - Thread t@43
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink Netty Server (0) Thread 1" - Thread t@46
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-18" - Thread t@102
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-17" - Thread t@101
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"org.apache.hadoop.hdfs.PeerCache@10f58193" - Thread t@90
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:249)
        at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:46)
        at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:124)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Thread-34" - Thread t@86
   java.lang.Thread.State: RUNNABLE
        at org.apache.hadoop.net.unix.DomainSocketWatcher.doPoll0(Native Method)
        at org.apache.hadoop.net.unix.DomainSocketWatcher.access$900(DomainSocketWatcher.java:52)
        at org.apache.hadoop.net.unix.DomainSocketWatcher$2.run(DomainSocketWatcher.java:509)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner" - Thread t@85
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <47f78065> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:2989)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Flink-MetricRegistry-thread-1" - Thread t@81
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <60abd357> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
        at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-16" - Thread t@80
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Timer-1" - Thread t@79
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <98dc2e9> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:552)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"Timer-0" - Thread t@77
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <54d0fdab> (a java.util.TaskQueue)
        at java.util.TimerThread.mainLoop(Timer.java:552)
        at java.util.TimerThread.run(Timer.java:505)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-15" - Thread t@75
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"Hashed wheel timer #1" - Thread t@37
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.waitForNextTick(HashedWheelTimer.java:445)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:364)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #12" - Thread t@71
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2f5fbbfc> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #11" - Thread t@70
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <1150f42b> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #10" - Thread t@69
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <76adf662> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #9" - Thread t@68
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <25c370b9> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #8" - Thread t@67
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <7711bfa6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #7" - Thread t@66
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <61f6fcc3> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #6" - Thread t@65
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <78711879> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #5" - Thread t@64
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4689e976> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #4" - Thread t@63
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <3c1f928e> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #3" - Thread t@62
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6a59712a> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #2" - Thread t@61
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <1a1b0ec8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager reader thread #1" - Thread t@60
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <135096a4> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:380)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #12" - Thread t@59
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <539beda2> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #11" - Thread t@58
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <726a06da> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #10" - Thread t@57
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <30c53b11> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #9" - Thread t@56
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <27376e1b> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #8" - Thread t@55
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2bcd61a2> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #7" - Thread t@54
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <933eed7> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #6" - Thread t@53
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <10375153> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #5" - Thread t@52
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <cecb2f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #4" - Thread t@51
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <4c6c02b6> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #3" - Thread t@50
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <5a3f18ba> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #2" - Thread t@49
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <70a86767> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"IOManager writer thread #1" - Thread t@48
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <19ba60f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:486)

   Locked ownable synchronizers:
        - None

"Flink Netty Server (0) Thread 0" - Thread t@45
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:622)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:310)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-14" - Thread t@42
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"New I/O server boss #6" - Thread t@41
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:101)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.select(NioServerBoss.java:163)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #5" - Thread t@40
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #4" - Thread t@39
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O boss #3" - Thread t@38
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #2" - Thread t@36
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"New I/O worker #1" - Thread t@35
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.SelectorUtil.select(SelectorUtil.java:68)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.select(AbstractNioSelector.java:434)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:212)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
        at org.apache.flink.shaded.akka.org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
        at org.apache.flink.shaded.akka.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-6" - Thread t@34
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.remote.default-remote-dispatcher-5" - Thread t@33
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <2d989af9> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2067)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-4" - Thread t@32
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-3" - Thread t@31
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-akka.actor.default-dispatcher-2" - Thread t@30
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <6b36d6cd> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
        at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

   Locked ownable synchronizers:
        - None

"flink-scheduler-1" - Thread t@29
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at akka.actor.LightArrayRevolverScheduler.waitNanos(LightArrayRevolverScheduler.scala:85)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:266)
        at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-0" - Thread t@24
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"RMI TCP Accept-50001" - Thread t@23
   java.lang.Thread.State: RUNNABLE
        at java.net.PlainSocketImpl.socketAccept(Native Method)
        at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
        at java.net.ServerSocket.implAccept(ServerSocket.java:545)
        at java.net.ServerSocket.accept(ServerSocket.java:513)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
        at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - None

"Signal Dispatcher" - Thread t@5
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
        - None

"Finalizer" - Thread t@3
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <35efa8cb> (a java.lang.ref.ReferenceQueue$Lock)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
        at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
        at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

   Locked ownable synchronizers:
        - None

"Reference Handler" - Thread t@2
   java.lang.Thread.State: WAITING
        at java.lang.Object.wait(Native Method)
        - waiting on <17b79c04> (a java.lang.ref.Reference$Lock)
        at java.lang.Object.wait(Object.java:502)
        at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
        at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

   Locked ownable synchronizers:
        - None

"main" - Thread t@1
   java.lang.Thread.State: WAITING
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for <52c4db3b> (a scala.concurrent.impl.Promise$CompletionLatch)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
        at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
        at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:169)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.ready(package.scala:169)
        at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:844)
        at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:845)
        at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1879)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply$mcV$sp(TaskManager.scala:1944)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1922)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.apply(TaskManager.scala:1922)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.flink.runtime.akka.AkkaUtils$.retryOnBindException(AkkaUtils.scala:755)
        at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskManager(TaskManager.scala:1922)
        at org.apache.flink.runtime.taskmanager.TaskManager$.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala:1691)
        at org.apache.flink.runtime.taskmanager.TaskManager.selectNetworkInterfaceAndRunTaskManager(TaskManager.scala)
        at org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:149)
        at org.apache.flink.yarn.YarnTaskManagerRunner$1.call(YarnTaskManagerRunner.java:145)
        at org.apache.flink.runtime.security.HadoopSecurityContext$$Lambda$12/890545344.run(Unknown Source)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.yarn.YarnTaskManagerRunner.runYarnTaskManager(YarnTaskManagerRunner.java:145)
        at org.apache.flink.yarn.YarnTaskManager$.main(YarnTaskManager.scala:65)
        at org.apache.flink.yarn.YarnTaskManager.main(YarnTaskManager.scala)

   Locked ownable synchronizers:
        - None

[2]: job manager log:
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from CREATED to SCHEDULED.
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from SCHEDULED to DEPLOYING.
2018-02-09 00:30:35,586 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Deploying DataSink (collect()) (8/8) (attempt #0) to hadoopworker2872-sjc1
2018-02-09 00:30:35,591 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - CoGroup (Messaging) (8/8) (fb783ae066c6661cc8d5891de73a5fd4) switched from RUNNING to FINISHED.
2018-02-09 00:30:35,592 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - DataSink (collect()) (8/8) (0f74d5a4ecbfce51abb42c71029fbe6c) switched from DEPLOYING to RUNNING.
2018-02-09 00:30:36,060 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (7/8) (c40e67da5f4f69c904635a1afa195d62) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,089 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (8/8) (5b71deb07a1535f0e48693ed116b6878) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,132 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (6/8) (03f067c00462327c8184ad82bb8a652b) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,144 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (2/8) (7532734db562d839d9b37f5a7685a767) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,177 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (5/8) (2a199e2d38809b783d402666a5c0865d) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,198 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (4/8) (4947f22d1c3de66cb29ff649835287be) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,207 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (1/8) (80d462c61a7f81af697b49cdc2bf047e) switched from RUNNING to FINISHED.
2018-02-09 00:30:36,533 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - IterationHead(Scatter-gather iteration (com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$VertexRankUpdater@191a709b | com.uber.usecurity.ugraph.olap.flink.standalone.api.SimpleNetworkInfluenceScore1$RankMessenger@77c7ed8e)) (3/8) (7951220955b77b4719de17eea1ff6054) switched from RUNNING to FINISHED.