Re: Flink job hangs using rocksDb as backend

Posted by Stefan Richter on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Flink-job-hangs-using-rocksDb-as-backend-tp21356p21396.html

Hi,

adding to what has already been said, I think that here can be two orthogonal problems here: i) why is your job slowing down/getting stuck? and ii) why is cancellation blocked? As for ii) I think Stephan already gave to right reason that shutdown could take longer and that is what gets the TM killed.

A more interesting question could still be i), why is your job slowing down until shutdown in the first place. I have two questions here.First, are you running on RocksDB on EBS volumes, then please have a look at this thread [1] because there can be some performance pitfalls. Second, how many timers are you expecting, and how are they firing? For example, if you have a huge amount of timers and the watermark makes a bug jump, there is a possibility that it takes a while until the job makes progress because it has to handle so many timer callbacks first. Metrics from even throughput and from your I/O subsystem could be helpful to see if something is stuck/underperforming or if there is just a lot of timer processing going on.

Best,
Stefan 

[1] https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3CCAKhqdDzAMDqEWiZ5B1QNdqv4+-mTvEfHbHEWrpxftLU7dV9FKw@...%3E

Am 11.07.2018 um 19:31 schrieb Nico Kruber <[hidden email]>:

If this is about too many timers and your application allows it, you may
also try to reduce the timer resolution and thus frequency by coalescing
them [1].


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing

On 11/07/18 18:27, Stephan Ewen wrote:
Hi shishal!

I think there is an issue with cancellation when many timers fire at the
same time. These timers have to finish before shutdown happens, this
seems to take a while in your case.

Did the TM process actually kill itself in the end (and got restarted)?



On Wed, Jul 11, 2018 at 9:29 AM, shishal <[hidden email]
<mailto:[hidden email]>> wrote:

   Hi,

   I am using flink 1.4.2 with rocksdb as backend. I am using process
   function
   with timer on EventTime.  For checkpointing I am using hdfs.

   I am trying load testing so Iam reading kafka from beginning (aprox
   7 days
   data with 50M events).

   My job gets stuck after aprox 20 min with no error. There after
   watermark do
   not progress and all checkpoint fails.

   Also When I try to cancel my job (using web UI) , it takes several
   minutes
   to finally gets cancelled. Also it makes Task manager down as well.

   There is no logs while my job hanged but while cancelling I get
   following
   error.

   /

   2018-07-11 09:10:39,385 ERROR
   org.apache.flink.runtime.taskmanager.TaskManager              -
   ==============================================================
   ======================      FATAL      =======================
   ==============================================================

   A fatal error occurred, forcing the TaskManager to shut down: Task
   'process
   (3/6)' did not react to cancelling signal in the last 30 seconds, but is
   stuck in method:
    org.rocksdb.RocksDB.get(Native Method)
   org.rocksdb.RocksDB.get(RocksDB.java:810)
   org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
   org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
   nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
   org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
   org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
   org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
   org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
   org.apache.flink.streaming.runtime.io
   <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
   org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
   org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
   org.apache.flink.streaming.runtime.io
   <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
   org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
   org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
   org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
   java.lang.Thread.run(Thread.java:748)

   2018-07-11 09:10:39,390 DEBUG
   org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy

   - Actor was killed. Stopping it now.
   akka.actor.ActorKilledException: Kill
   2018-07-11 09:10:39,407 INFO
   org.apache.flink.runtime.taskmanager.TaskManager              - Stopping
   TaskManager akka://flink/user/taskmanager#-1231617791.
   2018-07-11 09:10:39,408 INFO
   org.apache.flink.runtime.taskmanager.TaskManager              -
   Cancelling
   all computations and discarding all cached data.
   2018-07-11 09:10:39,409 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Attempting to fail task externally process (3/6)
   (432fd129f3eea363334521f8c8de5198).
   2018-07-11 09:10:39,409 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Task process (3/6) is already in state CANCELING
   2018-07-11 09:10:39,409 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Attempting to fail task externally process (4/6)
   (7c6b96c9f32b067bdf8fa7c283eca2e0).
   2018-07-11 09:10:39,409 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Task process (4/6) is already in state CANCELING
   2018-07-11 09:10:39,409 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Attempting to fail task externally process (2/6)
   (a4f731797a7ea210fd0b512b0263bcd9).
   2018-07-11 09:10:39,409 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Task process (2/6) is already in state CANCELING
   2018-07-11 09:10:39,409 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Attempting to fail task externally process (1/6)
   (cd8a113779a4c00a051d78ad63bc7963).
   2018-07-11 09:10:39,409 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Task process (1/6) is already in state CANCELING
   2018-07-11 09:10:39,409 INFO
   org.apache.flink.runtime.taskmanager.TaskManager              -
   Disassociating from JobManager
   2018-07-11 09:10:39,412 INFO
   org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting
   down BLOB cache
   2018-07-11 09:10:39,431 INFO
   org.apache.flink.runtime.blob.TransientBlobCache              - Shutting
   down BLOB cache
   2018-07-11 09:10:39,444 INFO
   org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService 
   -
   Stopping ZooKeeperLeaderRetrievalService.
   2018-07-11 09:10:39,444 DEBUG
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.disk.iomanager.IOManager       
     - Shutting
   down I/O manager.
   2018-07-11 09:10:39,451 INFO
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.disk.iomanager.IOManager       
     - I/O manager
   removed spill file directory
   /tmp/flink-io-989505e5-ac33-4d56-add5-04b2ad3067b4
   2018-07-11 09:10:39,461 INFO
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.network.NetworkEnvironment     
     - Shutting
   down the network environment and its components.
   2018-07-11 09:10:39,461 DEBUG
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.network.NetworkEnvironment     
     - Shutting
   down network connection manager
   2018-07-11 09:10:39,462 INFO
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.network.netty.NettyClient     
      - Successful
   shutdown (took 1 ms).
   2018-07-11 09:10:39,472 INFO
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.network.netty.NettyServer     
      - Successful
   shutdown (took 10 ms).
   2018-07-11 09:10:39,472 DEBUG
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.network.NetworkEnvironment     
     - Shutting
   down intermediate result partition manager
   2018-07-11 09:10:39,473 DEBUG
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager 
   -
   Releasing 0 partitions because of shutdown.
   2018-07-11 09:10:39,474 DEBUG
   org.apache.flink.runtime.io
   <http://org.apache.flink.runtime.io>.network.partition.ResultPartitionManager 
   -
   Successful shutdown.
   2018-07-11 09:10:39,498 INFO
   org.apache.flink.runtime.taskmanager.TaskManager              - Task
   manager
   akka://flink/user/taskmanager is completely shut down.
   2018-07-11 09:10:39,504 ERROR
   org.apache.flink.runtime.taskmanager.TaskManager              - Actor
   akka://flink/user/taskmanager#-1231617791 terminated, stopping
   process...
   2018-07-11 09:10:39,563 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Notifying TaskManager about fatal error. Task 'process (2/6)' did not
   react to cancelling signal in the last 30 seconds, but is stuck in
   method:
    org.rocksdb.RocksDB.get(Native Method)
   org.rocksdb.RocksDB.get(RocksDB.java:810)
   org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
   org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
   nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
   org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
   org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
   org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
   org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
   org.apache.flink.streaming.runtime.io
   <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
   org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
   org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
   org.apache.flink.streaming.runtime.io
   <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
   org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
   org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
   org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
   java.lang.Thread.run(Thread.java:748)
   .
   2018-07-11 09:10:39,575 INFO 
   org.apache.flink.runtime.taskmanager.Task                   
   - Notifying TaskManager about fatal error. Task 'process (1/6)' did not
   react to cancelling signal in the last 30 seconds, but is stuck in
   method:

   sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   java.lang.reflect.Constructor.newInstance(Constructor.java:423)
   java.lang.Class.newInstance(Class.java:442)
   org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:196)
   org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:399)
   org.apache.flink.contrib.streaming.state.RocksDBMapState.deserializeUserValue(RocksDBMapState.java:304)
   org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:104)
   org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
   nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
   org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
   org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
   org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
   org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
   org.apache.flink.streaming.runtime.io
   <http://runtime.io>.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
   org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
   org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
   org.apache.flink.streaming.runtime.io
   <http://runtime.io>.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
   org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
   org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
   org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
   java.lang.Thread.run(Thread.java:748)
   /



   --
   Sent from:
   http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
   <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>



--
Nico Kruber | Software Engineer
data Artisans

Follow us @dataArtisans
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany
data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen