Taskmanagers are quarantined

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Taskmanagers are quarantined

T Obi
Hello all,

We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
a problem. Suddenly a connection between a taskmanager and the
jobmanager is timed out and the taskmanager is "quarantined" by
jobmanager.
Once a taskmanager is quarantined, of course jobs are restarted, but
the timeout and quarantine happens to some taskmanager successively.

When a taskmanager's connection to jobmanager was timed out, its
connections to zookeeper and snapshot HDFS were also timed out. So the
problem doesn't seem to be one of Flink itself.
But though a taskmanager which runs on the same machine as jobmanager
is timed out, jobmanager is alright at the time. So I think it is not
OS problem too.

Could you give us some advice on how to investigate? Thank you.



Taskmanager command line:

java -XX:+UseG1GC -Xms219136M -Xmx219136M
-XX:MaxDirectMemorySize=8388607T
-Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
-Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
-Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
-classpath /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
org.apache.flink.runtime.taskmanager.TaskManager --configDir
/opt/flink/flink-1.3.2/conf


Taskmanager (on flink-jp-2) log:

2017-11-22 14:09:31,595 INFO
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
backend snapshot (File Stream Factory @
hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
synchronous part) in thread
Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
ProcessingTimeTrigger(),
WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
(9/30),5,Flink Task Threads] took 142 ms.
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
                     - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
                     - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
                     - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
                     - DFSOutputStream ResponseProcessor exception
for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
java.io.EOFException: Premature EOF: no length prefix available
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
        at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
                     - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
datanode 10.5.0.61:50010
2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
                     - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
datanode 10.5.0.59:50010
2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
                     - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
10.5.0.52:50010
2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
                     - Client session timed out, have not heard from
server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
connection and attempting reconnect
2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
                     - Error Recovery for block
BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
datanode 10.5.0.69:50010
2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
                     - Detected unreachable:
[akka.tcp://flink@flink-jp-2:43139]
2017-11-22 14:12:10,142 INFO
org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
 - State change: SUSPENDED
2017-11-22 14:12:10,142 WARN
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
 - Connection to ZooKeeper suspended. Can no longer retrieve the
leader from ZooKeeper.
2017-11-22 14:12:10,157 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
TaskManager akka://flink/user/taskmanager disconnects from JobManager
akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
longer reachable
2017-11-22 14:12:10,158 INFO
org.apache.flink.runtime.taskmanager.TaskManager              -
Cancelling all computations and discarding all cached data.



Jobmanager command line:

java -Xms8192m -Xmx8192m
-Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log
-Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
-Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
-classpath /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
org.apache.flink.runtime.jobmanager.JobManager --configDir
/opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
--webui-port 8081


Jobmanager (on flink-jp-2) log:

2017-11-22 14:09:32,252 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
Completed checkpoint 293 (125180549 bytes in 889
 ms).
2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
                     - Detected unreachable:
[akka.tcp://flink@flink-jp-2:42609]
2017-11-22 14:12:02,705 INFO
org.apache.flink.runtime.jobmanager.JobManager                - Task
manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
2017-11-22 14:12:02,705 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        -
Source: lamp-auction-test -> Flat Map -> Map -> Sink:
2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
        at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
        at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
        at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
        at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
        at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
        at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
        at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
        at akka.dispatch.Mailbox.run(Mailbox.scala:220)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Best,
Tetsuya
Reply | Threaded
Open this post in threaded view
|

Re: Taskmanagers are quarantined

Chesnay Schepler
Are only some taskmanagers quarantined, or all of them?

Do the quarantined taskmanagers have anything in common?
(are the failing ones always on certain machines; do the stacktraces
reference the same hdfs datanodes)

Which hadoop version are you using?

 From the stack-trace it appears that multiple hdfs nodes are being
corrupted.
The taskmanagers timeout since the connection to zookeeper breaks down,
at which point it no longer knows who the leading jobmanager knows and
subsequently shuts down.

On 27.11.2017 08:02, T Obi wrote:

> Hello all,
>
> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
> a problem. Suddenly a connection between a taskmanager and the
> jobmanager is timed out and the taskmanager is "quarantined" by
> jobmanager.
> Once a taskmanager is quarantined, of course jobs are restarted, but
> the timeout and quarantine happens to some taskmanager successively.
>
> When a taskmanager's connection to jobmanager was timed out, its
> connections to zookeeper and snapshot HDFS were also timed out. So the
> problem doesn't seem to be one of Flink itself.
> But though a taskmanager which runs on the same machine as jobmanager
> is timed out, jobmanager is alright at the time. So I think it is not
> OS problem too.
>
> Could you give us some advice on how to investigate? Thank you.
>
>
>
> Taskmanager command line:
>
> java -XX:+UseG1GC -Xms219136M -Xmx219136M
> -XX:MaxDirectMemorySize=8388607T
> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
> -classpath /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
> org.apache.flink.runtime.taskmanager.TaskManager --configDir
> /opt/flink/flink-1.3.2/conf
>
>
> Taskmanager (on flink-jp-2) log:
>
> 2017-11-22 14:09:31,595 INFO
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
> backend snapshot (File Stream Factory @
> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
> synchronous part) in thread
> Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
> ProcessingTimeTrigger(),
> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
> (9/30),5,Flink Task Threads] took 142 ms.
> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>                       - DFSOutputStream ResponseProcessor exception
> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
> java.io.EOFException: Premature EOF: no length prefix available
>          at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>          at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>                       - DFSOutputStream ResponseProcessor exception
> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
> java.io.EOFException: Premature EOF: no length prefix available
>          at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>          at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>                       - DFSOutputStream ResponseProcessor exception
> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
> java.io.EOFException: Premature EOF: no length prefix available
>          at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>          at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>                       - DFSOutputStream ResponseProcessor exception
> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
> java.io.EOFException: Premature EOF: no length prefix available
>          at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>          at org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>          at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
> 2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
>                       - Error Recovery for block
> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
> pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
> datanode 10.5.0.61:50010
> 2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
>                       - Error Recovery for block
> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
> pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
> datanode 10.5.0.59:50010
> 2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
>                       - Error Recovery for block
> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
> pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
> 10.5.0.52:50010
> 2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
>                       - Client session timed out, have not heard from
> server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
> connection and attempting reconnect
> 2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
>                       - Error Recovery for block
> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
> pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
> datanode 10.5.0.69:50010
> 2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
>                       - Detected unreachable:
> [akka.tcp://flink@flink-jp-2:43139]
> 2017-11-22 14:12:10,142 INFO
> org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
>   - State change: SUSPENDED
> 2017-11-22 14:12:10,142 WARN
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>   - Connection to ZooKeeper suspended. Can no longer retrieve the
> leader from ZooKeeper.
> 2017-11-22 14:12:10,157 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> TaskManager akka://flink/user/taskmanager disconnects from JobManager
> akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
> longer reachable
> 2017-11-22 14:12:10,158 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Cancelling all computations and discarding all cached data.
>
>
>
> Jobmanager command line:
>
> java -Xms8192m -Xmx8192m
> -Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log
> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
> -classpath /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
> org.apache.flink.runtime.jobmanager.JobManager --configDir
> /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
> --webui-port 8081
>
>
> Jobmanager (on flink-jp-2) log:
>
> 2017-11-22 14:09:32,252 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
> Completed checkpoint 293 (125180549 bytes in 889
>   ms).
> 2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
>                       - Detected unreachable:
> [akka.tcp://flink@flink-jp-2:42609]
> 2017-11-22 14:12:02,705 INFO
> org.apache.flink.runtime.jobmanager.JobManager                - Task
> manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
> 2017-11-22 14:12:02,705 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source: lamp-auction-test -> Flat Map -> Map -> Sink:
> 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
> from RUNNING to FAILED.
> java.lang.Exception: TaskManager was lost/killed:
> d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
>          at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>          at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
>          at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>          at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>          at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>          at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
>          at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
>          at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>          at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>          at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>          at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>          at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>          at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>          at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>          at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>          at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>          at akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>          at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>          at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>          at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>          at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>          at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>          at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>          at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>          at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>          at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Best,
> Tetsuya
>

Reply | Threaded
Open this post in threaded view
|

Re: Taskmanagers are quarantined

T Obi
Hello Chesnay,

Thank you for answer to my rough question.

Not all of taskmanagers are quarantined at a time, but each
taskmanager has been quarantined at least once.

We are using CDH 5.8 based on hadoop 2.6.
We didn't give attention about datanodes. We will check it.
However, we are also using the HDFS for MapReduce and it seems to work fine.

I searched archives of this mailing list with keyword "Detected
unreachable" and found out mails about trouble on GC.
Though we are using G1GC, we try to output GC log.


Best,
Tetsuya

2017-11-28 1:15 GMT+09:00 Chesnay Schepler <[hidden email]>:

> Are only some taskmanagers quarantined, or all of them?
>
> Do the quarantined taskmanagers have anything in common?
> (are the failing ones always on certain machines; do the stacktraces
> reference the same hdfs datanodes)
>
> Which hadoop version are you using?
>
> From the stack-trace it appears that multiple hdfs nodes are being
> corrupted.
> The taskmanagers timeout since the connection to zookeeper breaks down,
> at which point it no longer knows who the leading jobmanager knows and
> subsequently shuts down.
>
>
> On 27.11.2017 08:02, T Obi wrote:
>>
>> Hello all,
>>
>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
>> a problem. Suddenly a connection between a taskmanager and the
>> jobmanager is timed out and the taskmanager is "quarantined" by
>> jobmanager.
>> Once a taskmanager is quarantined, of course jobs are restarted, but
>> the timeout and quarantine happens to some taskmanager successively.
>>
>> When a taskmanager's connection to jobmanager was timed out, its
>> connections to zookeeper and snapshot HDFS were also timed out. So the
>> problem doesn't seem to be one of Flink itself.
>> But though a taskmanager which runs on the same machine as jobmanager
>> is timed out, jobmanager is alright at the time. So I think it is not
>> OS problem too.
>>
>> Could you give us some advice on how to investigate? Thank you.
>>
>>
>>
>> Taskmanager command line:
>>
>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
>> -XX:MaxDirectMemorySize=8388607T
>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>> -classpath
>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
>> /opt/flink/flink-1.3.2/conf
>>
>>
>> Taskmanager (on flink-jp-2) log:
>>
>> 2017-11-22 14:09:31,595 INFO
>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
>> backend snapshot (File Stream Factory @
>>
>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
>> synchronous part) in thread
>> Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
>>
>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
>>
>> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
>> ProcessingTimeTrigger(),
>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
>> (9/30),5,Flink Task Threads] took 142 ms.
>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>                       - DFSOutputStream ResponseProcessor exception
>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
>> java.io.EOFException: Premature EOF: no length prefix available
>>          at
>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>          at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>          at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>                       - DFSOutputStream ResponseProcessor exception
>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
>> java.io.EOFException: Premature EOF: no length prefix available
>>          at
>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>          at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>          at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>                       - DFSOutputStream ResponseProcessor exception
>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
>> java.io.EOFException: Premature EOF: no length prefix available
>>          at
>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>          at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>          at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>                       - DFSOutputStream ResponseProcessor exception
>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
>> java.io.EOFException: Premature EOF: no length prefix available
>>          at
>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>          at
>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>          at
>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> 2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
>>                       - Error Recovery for block
>> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
>> pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>> datanode 10.5.0.61:50010
>> 2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
>>                       - Error Recovery for block
>> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
>> pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
>> datanode 10.5.0.59:50010
>> 2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
>>                       - Error Recovery for block
>> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
>> pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
>> 10.5.0.52:50010
>> 2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
>>                       - Client session timed out, have not heard from
>> server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
>> connection and attempting reconnect
>> 2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
>>                       - Error Recovery for block
>> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
>> pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>> datanode 10.5.0.69:50010
>> 2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
>>                       - Detected unreachable:
>> [akka.tcp://flink@flink-jp-2:43139]
>> 2017-11-22 14:12:10,142 INFO
>>
>> org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
>>   - State change: SUSPENDED
>> 2017-11-22 14:12:10,142 WARN
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>   - Connection to ZooKeeper suspended. Can no longer retrieve the
>> leader from ZooKeeper.
>> 2017-11-22 14:12:10,157 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              -
>> TaskManager akka://flink/user/taskmanager disconnects from JobManager
>> akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
>> longer reachable
>> 2017-11-22 14:12:10,158 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              -
>> Cancelling all computations and discarding all cached data.
>>
>>
>>
>> Jobmanager command line:
>>
>> java -Xms8192m -Xmx8192m
>> -Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log
>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>> -classpath
>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>> org.apache.flink.runtime.jobmanager.JobManager --configDir
>> /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
>> --webui-port 8081
>>
>>
>> Jobmanager (on flink-jp-2) log:
>>
>> 2017-11-22 14:09:32,252 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Completed checkpoint 293 (125180549 bytes in 889
>>   ms).
>> 2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
>>                       - Detected unreachable:
>> [akka.tcp://flink@flink-jp-2:42609]
>> 2017-11-22 14:12:02,705 INFO
>> org.apache.flink.runtime.jobmanager.JobManager                - Task
>> manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
>> 2017-11-22 14:12:02,705 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> Source: lamp-auction-test -> Flat Map -> Map -> Sink:
>> 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
>> from RUNNING to FAILED.
>> java.lang.Exception: TaskManager was lost/killed:
>> d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
>>          at
>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>          at
>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
>>          at
>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>          at
>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>          at
>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>          at
>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
>>          at
>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
>>          at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>          at
>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>          at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>          at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>          at
>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>          at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>          at
>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>          at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>          at
>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>          at
>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>>          at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>>          at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>>          at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>          at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>          at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>          at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>          at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>          at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>          at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> Best,
>> Tetsuya
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Taskmanagers are quarantined

T Obi
Warnings of Datanode appeared not in all cases of timeout. They seem
to be raised just by timeout while snapshotting.

We output GC logs on taskmanagers and found that someone kicks
System.gc() every an hour.
So a full GC runs every an hour, and it takes about a minute or more
in our cases...
When a taskmanager is timed out, the full GC seems to be always
running on it. The full GC is not only by System.gc() but also "Full
GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though.

Some of our jobs have a large state. I think because of this the full
GC takes long time.
I try to make a few taskmanagers run with divided memory size on each machine.
Also I will tune JVM memory parameters to reduce the frequency of
"Full GC (Metadata GC Threshold)".

Best,
Tetsuya


2017-11-28 16:30 GMT+09:00 T Obi <[hidden email]>:

> Hello Chesnay,
>
> Thank you for answer to my rough question.
>
> Not all of taskmanagers are quarantined at a time, but each
> taskmanager has been quarantined at least once.
>
> We are using CDH 5.8 based on hadoop 2.6.
> We didn't give attention about datanodes. We will check it.
> However, we are also using the HDFS for MapReduce and it seems to work fine.
>
> I searched archives of this mailing list with keyword "Detected
> unreachable" and found out mails about trouble on GC.
> Though we are using G1GC, we try to output GC log.
>
>
> Best,
> Tetsuya
>
> 2017-11-28 1:15 GMT+09:00 Chesnay Schepler <[hidden email]>:
>> Are only some taskmanagers quarantined, or all of them?
>>
>> Do the quarantined taskmanagers have anything in common?
>> (are the failing ones always on certain machines; do the stacktraces
>> reference the same hdfs datanodes)
>>
>> Which hadoop version are you using?
>>
>> From the stack-trace it appears that multiple hdfs nodes are being
>> corrupted.
>> The taskmanagers timeout since the connection to zookeeper breaks down,
>> at which point it no longer knows who the leading jobmanager knows and
>> subsequently shuts down.
>>
>>
>> On 27.11.2017 08:02, T Obi wrote:
>>>
>>> Hello all,
>>>
>>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
>>> a problem. Suddenly a connection between a taskmanager and the
>>> jobmanager is timed out and the taskmanager is "quarantined" by
>>> jobmanager.
>>> Once a taskmanager is quarantined, of course jobs are restarted, but
>>> the timeout and quarantine happens to some taskmanager successively.
>>>
>>> When a taskmanager's connection to jobmanager was timed out, its
>>> connections to zookeeper and snapshot HDFS were also timed out. So the
>>> problem doesn't seem to be one of Flink itself.
>>> But though a taskmanager which runs on the same machine as jobmanager
>>> is timed out, jobmanager is alright at the time. So I think it is not
>>> OS problem too.
>>>
>>> Could you give us some advice on how to investigate? Thank you.
>>>
>>>
>>>
>>> Taskmanager command line:
>>>
>>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
>>> -XX:MaxDirectMemorySize=8388607T
>>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
>>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>>> -classpath
>>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
>>> /opt/flink/flink-1.3.2/conf
>>>
>>>
>>> Taskmanager (on flink-jp-2) log:
>>>
>>> 2017-11-22 14:09:31,595 INFO
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
>>> backend snapshot (File Stream Factory @
>>>
>>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
>>> synchronous part) in thread
>>> Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
>>>
>>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
>>>
>>> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
>>> ProcessingTimeTrigger(),
>>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
>>> (9/30),5,Flink Task Threads] took 142 ms.
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
>>> pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>>> datanode 10.5.0.61:50010
>>> 2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
>>> pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
>>> datanode 10.5.0.59:50010
>>> 2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
>>> pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
>>> 10.5.0.52:50010
>>> 2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
>>>                       - Client session timed out, have not heard from
>>> server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
>>> connection and attempting reconnect
>>> 2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
>>> pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>>> datanode 10.5.0.69:50010
>>> 2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
>>>                       - Detected unreachable:
>>> [akka.tcp://flink@flink-jp-2:43139]
>>> 2017-11-22 14:12:10,142 INFO
>>>
>>> org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
>>>   - State change: SUSPENDED
>>> 2017-11-22 14:12:10,142 WARN
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>>   - Connection to ZooKeeper suspended. Can no longer retrieve the
>>> leader from ZooKeeper.
>>> 2017-11-22 14:12:10,157 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              -
>>> TaskManager akka://flink/user/taskmanager disconnects from JobManager
>>> akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
>>> longer reachable
>>> 2017-11-22 14:12:10,158 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              -
>>> Cancelling all computations and discarding all cached data.
>>>
>>>
>>>
>>> Jobmanager command line:
>>>
>>> java -Xms8192m -Xmx8192m
>>> -Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log
>>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>>> -classpath
>>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>>> org.apache.flink.runtime.jobmanager.JobManager --configDir
>>> /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
>>> --webui-port 8081
>>>
>>>
>>> Jobmanager (on flink-jp-2) log:
>>>
>>> 2017-11-22 14:09:32,252 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Completed checkpoint 293 (125180549 bytes in 889
>>>   ms).
>>> 2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
>>>                       - Detected unreachable:
>>> [akka.tcp://flink@flink-jp-2:42609]
>>> 2017-11-22 14:12:02,705 INFO
>>> org.apache.flink.runtime.jobmanager.JobManager                - Task
>>> manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
>>> 2017-11-22 14:12:02,705 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>>> Source: lamp-auction-test -> Flat Map -> Map -> Sink:
>>> 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
>>> from RUNNING to FAILED.
>>> java.lang.Exception: TaskManager was lost/killed:
>>> d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
>>>          at
>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>          at
>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
>>>          at
>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>          at
>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>          at
>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
>>>          at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>          at
>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>          at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>          at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>          at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>>>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>          at
>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>>>          at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>>>          at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>>>          at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>>>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>          at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>          at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>> Best,
>>> Tetsuya
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Taskmanagers are quarantined

Till Rohrmann
Hi,

you could also try increasing the heartbeat timeout via `akka.watch.heartbeat.pause`. Maybe this helps to overcome the GC pauses.

Cheers,
Till

On Wed, Nov 29, 2017 at 12:41 PM, T Obi <[hidden email]> wrote:
Warnings of Datanode appeared not in all cases of timeout. They seem
to be raised just by timeout while snapshotting.

We output GC logs on taskmanagers and found that someone kicks
System.gc() every an hour.
So a full GC runs every an hour, and it takes about a minute or more
in our cases...
When a taskmanager is timed out, the full GC seems to be always
running on it. The full GC is not only by System.gc() but also "Full
GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though.

Some of our jobs have a large state. I think because of this the full
GC takes long time.
I try to make a few taskmanagers run with divided memory size on each machine.
Also I will tune JVM memory parameters to reduce the frequency of
"Full GC (Metadata GC Threshold)".

Best,
Tetsuya


2017-11-28 16:30 GMT+09:00 T Obi <[hidden email]>:
> Hello Chesnay,
>
> Thank you for answer to my rough question.
>
> Not all of taskmanagers are quarantined at a time, but each
> taskmanager has been quarantined at least once.
>
> We are using CDH 5.8 based on hadoop 2.6.
> We didn't give attention about datanodes. We will check it.
> However, we are also using the HDFS for MapReduce and it seems to work fine.
>
> I searched archives of this mailing list with keyword "Detected
> unreachable" and found out mails about trouble on GC.
> Though we are using G1GC, we try to output GC log.
>
>
> Best,
> Tetsuya
>
> 2017-11-28 1:15 GMT+09:00 Chesnay Schepler <[hidden email]>:
>> Are only some taskmanagers quarantined, or all of them?
>>
>> Do the quarantined taskmanagers have anything in common?
>> (are the failing ones always on certain machines; do the stacktraces
>> reference the same hdfs datanodes)
>>
>> Which hadoop version are you using?
>>
>> From the stack-trace it appears that multiple hdfs nodes are being
>> corrupted.
>> The taskmanagers timeout since the connection to zookeeper breaks down,
>> at which point it no longer knows who the leading jobmanager knows and
>> subsequently shuts down.
>>
>>
>> On 27.11.2017 08:02, T Obi wrote:
>>>
>>> Hello all,
>>>
>>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
>>> a problem. Suddenly a connection between a taskmanager and the
>>> jobmanager is timed out and the taskmanager is "quarantined" by
>>> jobmanager.
>>> Once a taskmanager is quarantined, of course jobs are restarted, but
>>> the timeout and quarantine happens to some taskmanager successively.
>>>
>>> When a taskmanager's connection to jobmanager was timed out, its
>>> connections to zookeeper and snapshot HDFS were also timed out. So the
>>> problem doesn't seem to be one of Flink itself.
>>> But though a taskmanager which runs on the same machine as jobmanager
>>> is timed out, jobmanager is alright at the time. So I think it is not
>>> OS problem too.
>>>
>>> Could you give us some advice on how to investigate? Thank you.
>>>
>>>
>>>
>>> Taskmanager command line:
>>>
>>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
>>> -XX:MaxDirectMemorySize=8388607T
>>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
>>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>>> -classpath
>>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
>>> /opt/flink/flink-1.3.2/conf
>>>
>>>
>>> Taskmanager (on flink-jp-2) log:
>>>
>>> 2017-11-22 14:09:31,595 INFO
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
>>> backend snapshot (File Stream Factory @
>>>
>>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
>>> synchronous part) in thread
>>> Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
>>>
>>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
>>>
>>> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
>>> ProcessingTimeTrigger(),
>>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
>>> (9/30),5,Flink Task Threads] took 142 ms.
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
>>> pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>>> datanode 10.5.0.61:50010
>>> 2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
>>> pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
>>> datanode 10.5.0.59:50010
>>> 2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
>>> pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
>>> 10.5.0.52:50010
>>> 2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
>>>                       - Client session timed out, have not heard from
>>> server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
>>> connection and attempting reconnect
>>> 2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
>>> pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>>> datanode 10.5.0.69:50010
>>> 2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
>>>                       - Detected unreachable:
>>> [akka.tcp://flink@flink-jp-2:43139]
>>> 2017-11-22 14:12:10,142 INFO
>>>
>>> org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
>>>   - State change: SUSPENDED
>>> 2017-11-22 14:12:10,142 WARN
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>>   - Connection to ZooKeeper suspended. Can no longer retrieve the
>>> leader from ZooKeeper.
>>> 2017-11-22 14:12:10,157 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              -
>>> TaskManager akka://flink/user/taskmanager disconnects from JobManager
>>> akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
>>> longer reachable
>>> 2017-11-22 14:12:10,158 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              -
>>> Cancelling all computations and discarding all cached data.
>>>
>>>
>>>
>>> Jobmanager command line:
>>>
>>> java -Xms8192m -Xmx8192m
>>> -Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log
>>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>>> -classpath
>>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>>> org.apache.flink.runtime.jobmanager.JobManager --configDir
>>> /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
>>> --webui-port 8081
>>>
>>>
>>> Jobmanager (on flink-jp-2) log:
>>>
>>> 2017-11-22 14:09:32,252 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Completed checkpoint 293 (125180549 bytes in 889
>>>   ms).
>>> 2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
>>>                       - Detected unreachable:
>>> [akka.tcp://flink@flink-jp-2:42609]
>>> 2017-11-22 14:12:02,705 INFO
>>> org.apache.flink.runtime.jobmanager.JobManager                - Task
>>> manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
>>> 2017-11-22 14:12:02,705 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>>> Source: lamp-auction-test -> Flat Map -> Map -> Sink:
>>> 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
>>> from RUNNING to FAILED.
>>> java.lang.Exception: TaskManager was lost/killed:
>>> d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
>>>          at
>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>          at
>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
>>>          at
>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>          at
>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>          at
>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
>>>          at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>          at
>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>          at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>          at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>          at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>>>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>          at
>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>>>          at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>>>          at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>>>          at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>>>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>          at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>          at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>> Best,
>>> Tetsuya
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Taskmanagers are quarantined

Stephan Ewen-2
We also saw issues in the failure detection/quarantining with some Hadoop versions because of a subtle runtime netty version conflict. Fink 1.4 shades Flink's / Akka's Netty, in Flink 1.3 you may need to exclude the Netty dependency pulled in through Hadoop explicitly.

Also, Hadoop version mismatches dinginess cause subtle problems. You can try to drop Flink's own Hadoop dependency and just drop in the CDH dependecy or jars.

Stephan


On Nov 29, 2017 14:34, "Till Rohrmann" <[hidden email]> wrote:
Hi,

you could also try increasing the heartbeat timeout via `akka.watch.heartbeat.pause`. Maybe this helps to overcome the GC pauses.

Cheers,
Till

On Wed, Nov 29, 2017 at 12:41 PM, T Obi <[hidden email]> wrote:
Warnings of Datanode appeared not in all cases of timeout. They seem
to be raised just by timeout while snapshotting.

We output GC logs on taskmanagers and found that someone kicks
System.gc() every an hour.
So a full GC runs every an hour, and it takes about a minute or more
in our cases...
When a taskmanager is timed out, the full GC seems to be always
running on it. The full GC is not only by System.gc() but also "Full
GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though.

Some of our jobs have a large state. I think because of this the full
GC takes long time.
I try to make a few taskmanagers run with divided memory size on each machine.
Also I will tune JVM memory parameters to reduce the frequency of
"Full GC (Metadata GC Threshold)".

Best,
Tetsuya


2017-11-28 16:30 GMT+09:00 T Obi <[hidden email]>:
> Hello Chesnay,
>
> Thank you for answer to my rough question.
>
> Not all of taskmanagers are quarantined at a time, but each
> taskmanager has been quarantined at least once.
>
> We are using CDH 5.8 based on hadoop 2.6.
> We didn't give attention about datanodes. We will check it.
> However, we are also using the HDFS for MapReduce and it seems to work fine.
>
> I searched archives of this mailing list with keyword "Detected
> unreachable" and found out mails about trouble on GC.
> Though we are using G1GC, we try to output GC log.
>
>
> Best,
> Tetsuya
>
> 2017-11-28 1:15 GMT+09:00 Chesnay Schepler <[hidden email]>:
>> Are only some taskmanagers quarantined, or all of them?
>>
>> Do the quarantined taskmanagers have anything in common?
>> (are the failing ones always on certain machines; do the stacktraces
>> reference the same hdfs datanodes)
>>
>> Which hadoop version are you using?
>>
>> From the stack-trace it appears that multiple hdfs nodes are being
>> corrupted.
>> The taskmanagers timeout since the connection to zookeeper breaks down,
>> at which point it no longer knows who the leading jobmanager knows and
>> subsequently shuts down.
>>
>>
>> On 27.11.2017 08:02, T Obi wrote:
>>>
>>> Hello all,
>>>
>>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
>>> a problem. Suddenly a connection between a taskmanager and the
>>> jobmanager is timed out and the taskmanager is "quarantined" by
>>> jobmanager.
>>> Once a taskmanager is quarantined, of course jobs are restarted, but
>>> the timeout and quarantine happens to some taskmanager successively.
>>>
>>> When a taskmanager's connection to jobmanager was timed out, its
>>> connections to zookeeper and snapshot HDFS were also timed out. So the
>>> problem doesn't seem to be one of Flink itself.
>>> But though a taskmanager which runs on the same machine as jobmanager
>>> is timed out, jobmanager is alright at the time. So I think it is not
>>> OS problem too.
>>>
>>> Could you give us some advice on how to investigate? Thank you.
>>>
>>>
>>>
>>> Taskmanager command line:
>>>
>>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
>>> -XX:MaxDirectMemorySize=8388607T
>>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
>>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>>> -classpath
>>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
>>> /opt/flink/flink-1.3.2/conf
>>>
>>>
>>> Taskmanager (on flink-jp-2) log:
>>>
>>> 2017-11-22 14:09:31,595 INFO
>>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
>>> backend snapshot (File Stream Factory @
>>>
>>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
>>> synchronous part) in thread
>>> Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
>>>
>>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
>>>
>>> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
>>> ProcessingTimeTrigger(),
>>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
>>> (9/30),5,Flink Task Threads] took 142 ms.
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - DFSOutputStream ResponseProcessor exception
>>> for block BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
>>> java.io.EOFException: Premature EOF: no length prefix available
>>>          at
>>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>>>          at
>>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>>>          at
>>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>>> 2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
>>> pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>>> datanode 10.5.0.61:50010
>>> 2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
>>> pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
>>> datanode 10.5.0.59:50010
>>> 2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
>>> pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
>>> 10.5.0.52:50010
>>> 2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
>>>                       - Client session timed out, have not heard from
>>> server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
>>> connection and attempting reconnect
>>> 2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
>>>                       - Error Recovery for block
>>> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
>>> pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>>> datanode 10.5.0.69:50010
>>> 2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
>>>                       - Detected unreachable:
>>> [akka.tcp://flink@flink-jp-2:43139]
>>> 2017-11-22 14:12:10,142 INFO
>>>
>>> org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
>>>   - State change: SUSPENDED
>>> 2017-11-22 14:12:10,142 WARN
>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>>   - Connection to ZooKeeper suspended. Can no longer retrieve the
>>> leader from ZooKeeper.
>>> 2017-11-22 14:12:10,157 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              -
>>> TaskManager akka://flink/user/taskmanager disconnects from JobManager
>>> akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
>>> longer reachable
>>> 2017-11-22 14:12:10,158 INFO
>>> org.apache.flink.runtime.taskmanager.TaskManager              -
>>> Cancelling all computations and discarding all cached data.
>>>
>>>
>>>
>>> Jobmanager command line:
>>>
>>> java -Xms8192m -Xmx8192m
>>> -Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log
>>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>>> -classpath
>>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>>> org.apache.flink.runtime.jobmanager.JobManager --configDir
>>> /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
>>> --webui-port 8081
>>>
>>>
>>> Jobmanager (on flink-jp-2) log:
>>>
>>> 2017-11-22 14:09:32,252 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>>> Completed checkpoint 293 (125180549 bytes in 889
>>>   ms).
>>> 2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
>>>                       - Detected unreachable:
>>> [akka.tcp://flink@flink-jp-2:42609]
>>> 2017-11-22 14:12:02,705 INFO
>>> org.apache.flink.runtime.jobmanager.JobManager                - Task
>>> manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
>>> 2017-11-22 14:12:02,705 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>>> Source: lamp-auction-test -> Flat Map -> Map -> Sink:
>>> 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
>>> from RUNNING to FAILED.
>>> java.lang.Exception: TaskManager was lost/killed:
>>> d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
>>>          at
>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>          at
>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
>>>          at
>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>          at
>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>          at
>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
>>>          at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>          at
>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>          at
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>          at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>          at
>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>          at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>          at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>>>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>          at
>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>>>          at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>>>          at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>>>          at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>>>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>          at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>          at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>          at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>> Best,
>>> Tetsuya
>>>
>>


Reply | Threaded
Open this post in threaded view
|

Re: Taskmanagers are quarantined

T Obi
Hello,

Thank you for much advice. Sorry for my late response.

First, I made a little mistake.
I set `env.java.opts.taskmanager` to enable GC log, and it cancelled
to automatically set `UseG1GC` feature by accident.
This means I watched log of Parallel GC.
When I enabled both GC log and `UseG1GC` feature, then I didn't see
System.gc() per hour in the log.

However, it seems correct that Full GC causes the timeout and quarantine.
The Full GC was caused by memory shortages.
Some of our jobs have large states, and memory shortages seem to
happen when many (sub-)tasks of the jobs assign to a taskmanager. The
jobmanager looks to simply assign tasks by the number of them.
We will try to increase parallelism of such tasks and wish that they
are balanced more.
Or we try to decrease the size of the states.
I think it is the best if we can specify weights for tasks, though.

Thank you for telling me about `akka.watch.heartbeat.pause`.
We set a larger number for `akka.watch.heartbeat.pause` but we also
think it's not good to pause all processes for 60 seconds.
So we tried to run two JVMs with a half size of heap memory on a
machine, and then Full GCs only spent about a half of the time.

Also thank you for advice about CDH jars.
It looks a little difficult to me but I will try if needed.


Best,
Tetsuya


2017-11-30 0:07 GMT+09:00 Stephan Ewen <[hidden email]>:

> We also saw issues in the failure detection/quarantining with some Hadoop
> versions because of a subtle runtime netty version conflict. Fink 1.4 shades
> Flink's / Akka's Netty, in Flink 1.3 you may need to exclude the Netty
> dependency pulled in through Hadoop explicitly.
>
> Also, Hadoop version mismatches dinginess cause subtle problems. You can try
> to drop Flink's own Hadoop dependency and just drop in the CDH dependecy or
> jars.
>
> Stephan
>
>
> On Nov 29, 2017 14:34, "Till Rohrmann" <[hidden email]> wrote:
>
> Hi,
>
> you could also try increasing the heartbeat timeout via
> `akka.watch.heartbeat.pause`. Maybe this helps to overcome the GC pauses.
>
> Cheers,
> Till
>
> On Wed, Nov 29, 2017 at 12:41 PM, T Obi <[hidden email]> wrote:
>>
>> Warnings of Datanode appeared not in all cases of timeout. They seem
>> to be raised just by timeout while snapshotting.
>>
>> We output GC logs on taskmanagers and found that someone kicks
>> System.gc() every an hour.
>> So a full GC runs every an hour, and it takes about a minute or more
>> in our cases...
>> When a taskmanager is timed out, the full GC seems to be always
>> running on it. The full GC is not only by System.gc() but also "Full
>> GC (Ergonomics)" and "Full GC (Metadata GC Threshold)", though.
>>
>> Some of our jobs have a large state. I think because of this the full
>> GC takes long time.
>> I try to make a few taskmanagers run with divided memory size on each
>> machine.
>> Also I will tune JVM memory parameters to reduce the frequency of
>> "Full GC (Metadata GC Threshold)".
>>
>> Best,
>> Tetsuya
>>
>>
>> 2017-11-28 16:30 GMT+09:00 T Obi <[hidden email]>:
>> > Hello Chesnay,
>> >
>> > Thank you for answer to my rough question.
>> >
>> > Not all of taskmanagers are quarantined at a time, but each
>> > taskmanager has been quarantined at least once.
>> >
>> > We are using CDH 5.8 based on hadoop 2.6.
>> > We didn't give attention about datanodes. We will check it.
>> > However, we are also using the HDFS for MapReduce and it seems to work
>> > fine.
>> >
>> > I searched archives of this mailing list with keyword "Detected
>> > unreachable" and found out mails about trouble on GC.
>> > Though we are using G1GC, we try to output GC log.
>> >
>> >
>> > Best,
>> > Tetsuya
>> >
>> > 2017-11-28 1:15 GMT+09:00 Chesnay Schepler <[hidden email]>:
>> >> Are only some taskmanagers quarantined, or all of them?
>> >>
>> >> Do the quarantined taskmanagers have anything in common?
>> >> (are the failing ones always on certain machines; do the stacktraces
>> >> reference the same hdfs datanodes)
>> >>
>> >> Which hadoop version are you using?
>> >>
>> >> From the stack-trace it appears that multiple hdfs nodes are being
>> >> corrupted.
>> >> The taskmanagers timeout since the connection to zookeeper breaks down,
>> >> at which point it no longer knows who the leading jobmanager knows and
>> >> subsequently shuts down.
>> >>
>> >>
>> >> On 27.11.2017 08:02, T Obi wrote:
>> >>>
>> >>> Hello all,
>> >>>
>> >>> We run jobs on a standalone cluster with Flink 1.3.2 and we're facing
>> >>> a problem. Suddenly a connection between a taskmanager and the
>> >>> jobmanager is timed out and the taskmanager is "quarantined" by
>> >>> jobmanager.
>> >>> Once a taskmanager is quarantined, of course jobs are restarted, but
>> >>> the timeout and quarantine happens to some taskmanager successively.
>> >>>
>> >>> When a taskmanager's connection to jobmanager was timed out, its
>> >>> connections to zookeeper and snapshot HDFS were also timed out. So the
>> >>> problem doesn't seem to be one of Flink itself.
>> >>> But though a taskmanager which runs on the same machine as jobmanager
>> >>> is timed out, jobmanager is alright at the time. So I think it is not
>> >>> OS problem too.
>> >>>
>> >>> Could you give us some advice on how to investigate? Thank you.
>> >>>
>> >>>
>> >>>
>> >>> Taskmanager command line:
>> >>>
>> >>> java -XX:+UseG1GC -Xms219136M -Xmx219136M
>> >>> -XX:MaxDirectMemorySize=8388607T
>> >>>
>> >>> -Dlog.file=/var/log/flink/flink-log-manager-taskmanager-0-flink-jp-2.log
>> >>>
>> >>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>> >>>
>> >>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>> >>> -classpath
>> >>>
>> >>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>> >>> org.apache.flink.runtime.taskmanager.TaskManager --configDir
>> >>> /opt/flink/flink-1.3.2/conf
>> >>>
>> >>>
>> >>> Taskmanager (on flink-jp-2) log:
>> >>>
>> >>> 2017-11-22 14:09:31,595 INFO
>> >>> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap
>> >>> backend snapshot (File Stream Factory @
>> >>>
>> >>>
>> >>> hdfs://nameservice1/user/log-manager/flink/checkpoints-data/9469db324b834e9dcf5b46428b3ae011,
>> >>> synchronous part) in thread
>> >>> Thread[TriggerWindow(TumblingProcessingTimeWindows(60000),
>> >>>
>> >>>
>> >>> ReducingStateDescriptor{serializer=jp.geniee.reporter.executable.BuyerReporterV2Auction$$anon$12$$anon$7@d2619591,
>> >>>
>> >>>
>> >>> reduceFunction=org.apache.flink.streaming.api.scala.function.util.ScalaReduceFunction@72bca894},
>> >>> ProcessingTimeTrigger(),
>> >>> WindowedStream.reduce(WindowedStream.java:300)) -> Map -> Map
>> >>> (9/30),5,Flink Task Threads] took 142 ms.
>> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>> >>>                       - DFSOutputStream ResponseProcessor exception
>> >>> for block
>> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999
>> >>> java.io.EOFException: Premature EOF: no length prefix available
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>> >>>                       - DFSOutputStream ResponseProcessor exception
>> >>> for block
>> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744
>> >>> java.io.EOFException: Premature EOF: no length prefix available
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>> >>>                       - DFSOutputStream ResponseProcessor exception
>> >>> for block
>> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092
>> >>> java.io.EOFException: Premature EOF: no length prefix available
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> >>> 2017-11-22 14:12:10,028 WARN  org.apache.hadoop.hdfs.DFSClient
>> >>>                       - DFSOutputStream ResponseProcessor exception
>> >>> for block
>> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393
>> >>> java.io.EOFException: Premature EOF: no length prefix available
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2207)
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck.readFields(PipelineAck.java:176)
>> >>>          at
>> >>>
>> >>> org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:867)
>> >>> 2017-11-22 14:12:10,041 WARN  org.apache.hadoop.hdfs.DFSClient
>> >>>                       - Error Recovery for block
>> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194079071_620517393 in
>> >>> pipeline 10.5.0.61:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>> >>> datanode 10.5.0.61:50010
>> >>> 2017-11-22 14:12:10,039 WARN  org.apache.hadoop.hdfs.DFSClient
>> >>>                       - Error Recovery for block
>> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194080160_620520092 in
>> >>> pipeline 10.5.0.59:50010, 10.5.0.52:50010, 10.5.0.63:50010: bad
>> >>> datanode 10.5.0.59:50010
>> >>> 2017-11-22 14:12:10,038 WARN  org.apache.hadoop.hdfs.DFSClient
>> >>>                       - Error Recovery for block
>> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194080159_621053744 in
>> >>> pipeline 10.5.0.52:50010, 10.5.0.78:50010: bad datanode
>> >>> 10.5.0.52:50010
>> >>> 2017-11-22 14:12:10,029 INFO  org.apache.zookeeper.ClientCnxn
>> >>>                       - Client session timed out, have not heard from
>> >>> server in 73797ms for sessionid 0x35f5cb4184700a4, closing socket
>> >>> connection and attempting reconnect
>> >>> 2017-11-22 14:12:10,057 WARN  org.apache.hadoop.hdfs.DFSClient
>> >>>                       - Error Recovery for block
>> >>> BP-390359345-10.5.0.29-1476670682927:blk_1194079870_620518999 in
>> >>> pipeline 10.5.0.69:50010, 10.5.0.59:50010, 10.5.0.74:50010: bad
>> >>> datanode 10.5.0.69:50010
>> >>> 2017-11-22 14:12:10,113 WARN  akka.remote.RemoteWatcher
>> >>>                       - Detected unreachable:
>> >>> [akka.tcp://flink@flink-jp-2:43139]
>> >>> 2017-11-22 14:12:10,142 INFO
>> >>>
>> >>>
>> >>> org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager
>> >>>   - State change: SUSPENDED
>> >>> 2017-11-22 14:12:10,142 WARN
>> >>>
>> >>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>> >>>   - Connection to ZooKeeper suspended. Can no longer retrieve the
>> >>> leader from ZooKeeper.
>> >>> 2017-11-22 14:12:10,157 INFO
>> >>> org.apache.flink.runtime.taskmanager.TaskManager              -
>> >>> TaskManager akka://flink/user/taskmanager disconnects from JobManager
>> >>> akka.tcp://flink@flink-jp-2:43139/user/jobmanager: JobManager is no
>> >>> longer reachable
>> >>> 2017-11-22 14:12:10,158 INFO
>> >>> org.apache.flink.runtime.taskmanager.TaskManager              -
>> >>> Cancelling all computations and discarding all cached data.
>> >>>
>> >>>
>> >>>
>> >>> Jobmanager command line:
>> >>>
>> >>> java -Xms8192m -Xmx8192m
>> >>>
>> >>> -Dlog.file=/var/log/flink/flink-log-manager-jobmanager-0-flink-jp-2.log
>> >>>
>> >>> -Dlog4j.configuration=file:/opt/flink/flink-1.3.2/conf/log4j.properties
>> >>>
>> >>> -Dlogback.configurationFile=file:/opt/flink/flink-1.3.2/conf/logback.xml
>> >>> -classpath
>> >>>
>> >>> /opt/flink/flink-1.3.2/lib/flink-python_2.11-1.3.2.jar:/opt/flink/flink-1.3.2/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/flink-1.3.2/lib/log4j-1.2.17.jar:/opt/flink/flink-1.3.2/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/flink-1.3.2/lib/flink-dist_2.11-1.3.2.jar:::
>> >>> org.apache.flink.runtime.jobmanager.JobManager --configDir
>> >>> /opt/flink/flink-1.3.2/conf --executionMode cluster --host flink-jp-2
>> >>> --webui-port 8081
>> >>>
>> >>>
>> >>> Jobmanager (on flink-jp-2) log:
>> >>>
>> >>> 2017-11-22 14:09:32,252 INFO
>> >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> >>> Completed checkpoint 293 (125180549 bytes in 889
>> >>>   ms).
>> >>> 2017-11-22 14:12:02,705 WARN  akka.remote.RemoteWatcher
>> >>>                       - Detected unreachable:
>> >>> [akka.tcp://flink@flink-jp-2:42609]
>> >>> 2017-11-22 14:12:02,705 INFO
>> >>> org.apache.flink.runtime.jobmanager.JobManager                - Task
>> >>> manager akka.tcp://flink@flink-jp-2:42609/user/taskmanager terminated.
>> >>> 2017-11-22 14:12:02,705 INFO
>> >>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> >>> Source: lamp-auction-test -> Flat Map -> Map -> Sink:
>> >>> 2017-11-22-auc-log (30/30) (a853390bb17f6d58997ad994266d3df2) switched
>> >>> from RUNNING to FAILED.
>> >>> java.lang.Exception: TaskManager was lost/killed:
>> >>> d51c4d252a8c1ff222b728ca50dbe55a @ flink-jp-2 (dataPort=37930)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>> >>>          at
>> >>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1131)
>> >>>          at
>> >>>
>> >>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>> >>>          at
>> >>>
>> >>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>> >>>          at
>> >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>> >>>          at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> >>>          at
>> >>>
>> >>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
>> >>>          at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> >>>          at
>> >>>
>> >>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:44)
>> >>>          at
>> >>> akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>> >>>          at
>> >>> akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>> >>>          at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>> >>>          at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> >>>          at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> >>>          at
>> >>>
>> >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> >>>          at
>> >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>>          at
>> >>>
>> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>>          at
>> >>>
>> >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>>          at
>> >>>
>> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>>
>> >>>
>> >>>
>> >>> Best,
>> >>> Tetsuya
>> >>>
>> >>
>
>
>